<a href="https://colab.research.google.com/github/sarbaniAi/mlprojects/blob/main/PySpark_From_Zero_to_Hero_(Sept_2019).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark - From Zero to Hero (March 2019)


https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59



https://github.com/vkocaman/PySpark_Essentials_March_2019/blob/master/PySpark%20-%20From%20Zero%20to%20Hero%20(Sept%202019).ipynb

### Overview of PySpark

- Apache Spark is written in Scala

- To support Python with Spark, Apache Spark Community released PySpark

- Similar computation speed and power as Scala

- PySpark APIs are similar to Pandas and Scikit-learn

- The high level components of a Spark application include the Spark driver, the Spark executors and the Cluster Manager.

- Spark supports three cluster managers:

    - Built-in standalone cluster managers
    - Apache Mesos
    - Hadoop YARN

- Execution modes:

    - Cluster Mode, 
    - Client Mode(default),
    - Local Mode.

### What is Spark shell?

- Interactive environment for running Spark jobs

- Helpful for fast interactive prototyping

- Spark’s shells allow interacting with data on disk or in memory

- Three different Spark shells:

    Spark-shell for Scala

    PySpark-shell for Python

    SparkR for R


### PySpark shell

- PySpark shell is the Python-based command line tool

- PySpark shell data scientists interfere with Spark data structures

- PySpark shell support connecting to cluster


### Understanding SparkContext

- SparkContext is an entry point into the world of Spark

- An entry point is a way of connecting to Spark cluster

- An entry point is like a key to the house

- PySpark has a default SparkContext called sc


In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import warnings
"""
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = SparkContext(appName="SDDM", master='local[*]')
    print("Just created a SparkContext")
    sqlContext = SQLContext(sc)
    print("Just created a SQLContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")
    
"""

"""
sc = SparkContext(appName="SDDM")

sc.setMaster('spark://fs.das3.liacs.nl:7077')#("local[*]")

sc = SparkContext.getOrCreate()
"""

sc = SparkContext(appName="SDDM", master= "local[*]")

sc = SparkContext.getOrCreate()

# ==>> DO NOT FORGET WHNE YOU'RE DONE>> sc.stop()

In [None]:
sc.stop()

In [None]:
# Master can be local[*], spark:// , yarn, etc.
# SparkContext available as sc, SQLContext available as sqlContext.

# Inspecting SparkContext
# Version: To retrieve SparkContext version

print (sc.version)
#2.3.1

# Python Version: To retrieve Python version of SparkContext

print(sc.pythonVer)
#3.6

# Master: URL of the cluster or “local” string to run in local mode of SparkContext

print (sc.master)

#local[*]

2.4.0
3.6
local[*]


In [None]:
# Spark executon plan to show lazy evaluation with Word Count example

# https://github.com/tirthajyoti/Spark-with-Python/blob/master/Word_Count.ipynb

# SparkContext - number of workers and lazy evaluation¶

# https://github.com/tirthajyoti/Spark-with-Python/blob/master/SparkContext_Workers_Lazy_Evaluations.ipynb


## Spark RDD (Resillient Distributed Datasets)

### Basics of RDD

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations.

Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel.

Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. 

Resilient: Ability to withstand failures

Distributed: Spanning across multiple machines

Datasets: Collection of partitioned data e.g, Arrays, Tables, Tuples etc.,

RDD is 
- Lazily evaluated (transformations & actions)
- Recomputed on node failure 
- Distributed across the cluster
    
Transformations (lazy) 

    map 
    filter 
    flatMap 
    reduceByKey 
    join 
    cogroup

Actions (eager) 

    count 
    reduce 
    collect 
    take 
    saveAsTextFile 
    saveAsHadoop 
    countByValue



### Creating RDDs

There are two ways to create RDDs,

parallelizing an existing collection of objects in your driver program,

External datasets (referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.)

    Files in HDFS

    Objects in Amazon S3 bucket

    lines in a text file

From existing RDDs

In [None]:
# Loading data in PySpark

# Parallelized collection (parallelizing)

# parallelize() for creating RDDs from python lists

numRDD = sc.parallelize([1,2,3,4])

helloRDD = sc.parallelize("Hello world")

print (type(helloRDD))


#SparkContext's parallelize() method

rdd = sc.parallelize([1,2,3,4,5])

# creating RDDs from external datasets
# SparkContext's textFile() method

rdd2 = sc.textFile("example_text.txt")

<class 'pyspark.rdd.RDD'>


In [None]:
numRDD.collect()

[1, 2, 3, 4]

In [None]:
rdd.collect()

[1, 2, 3, 4, 5]

In [None]:
rdd2.filter(lambda x: x!="").collect()[:10]

['The Project Gutenberg EBook of Ulysses, by James Joyce',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever. You may copy it, give it away or re-use',
 'it under the terms of the Project Gutenberg License included with this',
 'eBook or online at www.gutenberg.org',
 'Title: Ulysses',
 'Author: James Joyce',
 'Release Date: August 1, 2008 [EBook #4300]',
 'Last Updated: August 17, 2017',
 'Language: English']

In [None]:
rdd2.count() 

32710

In [None]:
LineLength = rdd2.map(lambda x : len(x))
print (LineLength.count())
print (LineLength.collect()[:5])

32710
[0, 54, 0, 71, 67]


In [None]:
rdd3 = sc.wholeTextFiles("example_text.txt", 8)

rdd3.keys().collect()


['file:/Users/vkocaman/Python_Projects/Leiden/Spark/example_text.txt']

In [None]:
rdd3.values().collect()

In [None]:
# Read Data from HDFS
# hdfs://localhost:9746 filamentData.csv
data = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)
data.take(4)

# Read a file from HDFS and count the words
https://github.com/radanalyticsio/radanalyticsio.github.io/blob/master/assets/pyspark_hdfs_notebook/PySpark_HDFS.ipynb

# Reading input from S3 with Apache Spark on OpenShift
https://github.com/radanalyticsio/radanalyticsio.github.io/blob/master/assets/s3-source-example/s3-source-example.ipynb

In [None]:
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

In [None]:
# Save RDD Data to HDFS
playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
playDataLineLength = playData.map(lambda x : len(x))

# Each file has a single data point because our RDD has four partitions.
playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')
#  hadoop fs -cat /savedData/part-00000
#  hadoop fs -cat /savedData/part-00001
#  hadoop fs -cat /savedData/part-00002
#  hadoop fs -cat /savedData/part-00003

In [None]:
# Read a CSV File
# Writing a Python Function to Parse CSV Lines
import csv
from io import StringIO

def parseCSV(csvRow) :
    data = StringIO(csvRow)
    dataReader = csv.reader(data, lineterminator = '')
    return(next(dataReader))

csvRow = "p,s,r,p"
parseCSV(csvRow)

['p', 's', 'r', 'p']

In [None]:
# Read csv file and Creating a Paired RDD
filamentRDD = sc.textFile('flights_small.csv', 4)
filamentRDDCSV = filamentRDD.map(parseCSV)
filamentRDDCSV.take(1)

[['year',
  'month',
  'day',
  'dep_time',
  'dep_delay',
  'arr_time',
  'arr_delay',
  'carrier',
  'tailnum',
  'flight',
  'origin',
  'dest',
  'air_time',
  'distance',
  'hour',
  'minute']]

In [None]:
rdd=[["asin", "helpful", "overall", "reviewText", "reviewTime", "reviewerID", "reviewerName", "summary", "unixReviewTime", "title", "price", "brand", "also_bought", "also_viewed", "bought_together", "salesRank", "categories", "related", "imUrl", "description"],
    [...., ..., ... , ...]]
rdd_helpfull=["asin", "reviewerID", "helpfulness_ratio"]


In [None]:
def rdd_join(x):
    if x[0]=="asin":
        return x+["helpfulness_ratio"]
    else:
        new_col = rdd_helpfull.filter(lambda y: y[0]==x[0] and y[1]==x[5]).collect()[2]
        return x+new_col

rdd.map(lambda x: rdd_join(x))

In [None]:
def hr_to_min(x):
    if x[0]=="year":
        return x+["sum"]
    else:
        return x+[len(x[7])]


In [None]:
[i for i in x]+[]

In [None]:
a=filamentRDDCSV.map(lambda x: hr_to_min(x))
#a.take(10)

In [None]:
a.collect()

[['year',
  'month',
  'day',
  'dep_time',
  'dep_delay',
  'arr_time',
  'arr_delay',
  'carrier',
  'tailnum',
  'flight',
  'origin',
  'dest',
  'air_time',
  'distance',
  'hour',
  'minute',
  'sum'],
 ['2014',
  '12',
  '8',
  '658',
  '-7',
  '935',
  '-5',
  'VX',
  'N846VA',
  '1780',
  'SEA',
  'LAX',
  '132',
  '954',
  '6',
  '58',
  2],
 ['2014',
  '1',
  '22',
  '1040',
  '5',
  '1505',
  '5',
  'AS',
  'N559AS',
  '851',
  'SEA',
  'HNL',
  '360',
  '2677',
  '10',
  '40',
  2],
 ['2014',
  '3',
  '9',
  '1443',
  '-2',
  '1652',
  '2',
  'VX',
  'N847VA',
  '755',
  'SEA',
  'SFO',
  '111',
  '679',
  '14',
  '43',
  2],
 ['2014',
  '4',
  '9',
  '1705',
  '45',
  '1839',
  '34',
  'WN',
  'N360SW',
  '344',
  'PDX',
  'SJC',
  '83',
  '569',
  '17',
  '5',
  2],
 ['2014',
  '3',
  '9',
  '754',
  '-1',
  '1015',
  '1',
  'AS',
  'N612AS',
  '522',
  'SEA',
  'BUR',
  '127',
  '937',
  '7',
  '54',
  2],
 ['2014',
  '1',
  '15',
  '1037',
  '7',
  '1352',
  '2',
  'WN

In [None]:
filamentRDDCSV.

In [None]:
filamentRDDCSV.(filamentRDDCSV).take(5)

[('2014', ('12', '12')),
 ('2014', ('12', '1')),
 ('2014', ('12', '3')),
 ('2014', ('12', '4')),
 ('2014', ('12', '3'))]

In [None]:
filamentRDDCSV.take(5)

[['year',
  'month',
  'day',
  'dep_time',
  'dep_delay',
  'arr_time',
  'arr_delay',
  'carrier',
  'tailnum',
  'flight',
  'origin',
  'dest',
  'air_time',
  'distance',
  'hour',
  'minute'],
 ['2014',
  '12',
  '8',
  '658',
  '-7',
  '935',
  '-5',
  'VX',
  'N846VA',
  '1780',
  'SEA',
  'LAX',
  '132',
  '954',
  '6',
  '58'],
 ['2014',
  '1',
  '22',
  '1040',
  '5',
  '1505',
  '5',
  'AS',
  'N559AS',
  '851',
  'SEA',
  'HNL',
  '360',
  '2677',
  '10',
  '40'],
 ['2014',
  '3',
  '9',
  '1443',
  '-2',
  '1652',
  '2',
  'VX',
  'N847VA',
  '755',
  'SEA',
  'SFO',
  '111',
  '679',
  '14',
  '43'],
 ['2014',
  '4',
  '9',
  '1705',
  '45',
  '1839',
  '34',
  'WN',
  'N360SW',
  '344',
  'PDX',
  'SJC',
  '83',
  '569',
  '17',
  '5']]

In [None]:
# Understanding Partitioning in PySpark

# A partition is a logical division of a large distributed data set

# parallelize() method

numRDD = sc.parallelize(range(10), numSlices = 3)

#textFile() method

fileRDD = sc.textFile("example_text.txt", minPartitions = 6)

#The number of partitions in an RDD can be found by using getNumPartitions() method

print (fileRDD.getNumPartitions())

print (numRDD.getNumPartitions())

6
3


#### anonymous functions in Python

Lambda functions are anonymous functions in Python

Very powerful and used in Python. Quite efficient with map() and filter()

Lambda functions create functions to be called later similar to def

It returns the functions without any name (i.e anonymous)

Inline a function definition or to defer execution of a code

In [None]:
## Transformations (lazy evaluation)

# map() Transformation

# map() transformation applies a function to all elements in the RDD

RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)

RDD_map.collect()

[1, 4, 9, 16]

In [None]:
# filter() Transformation

# Filter transformation returns a new RDD with only the elements that pass the condition

RDD = sc.parallelize([1,2,3,4])

RDD_filter = RDD.filter(lambda x: x > 2)

RDD_filter.collect()

[3, 4]

In [None]:
# flatMap() Transformation

# flatMap() transformation returns multiple values for each element in the original RDD

"""
Why are we using flatMap, rather than map?

The reason is that the operation line.split(" ") generates a list of strings, 
so had we used map the result would be an RDD of lists of words. Not an RDD of words.

The difference between map and flatMap is that the second expects to get a list as the result 
from the map and it concatenates the lists to form the RDD.
"""

RDD = sc.parallelize(["hello world", "how are you"])

RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))

RDD_flatmap.collect()

['hello', 'world', 'how', 'are', 'you']

In [None]:
# union() Transformation

inputRDD = sc.textFile("example_text.txt")

money_RDD = inputRDD.filter(lambda x: "money" in x.split())
biscuit_RDD = inputRDD.filter(lambda x: "biscuit" in x.split())
combinedRDD =money_RDD.union(biscuit_RDD)

combinedRDD.collect()[:10]

['with money and indigestion. Because he comes from Oxford. You know,',
 'downstairs and touch him for a guinea. He’s stinking with money and',
 '—Would I make any money by it? Stephen asked.',
 'moved over the shells heaped in the cold stone mortar: whelks and money',
 '—Thank you, sir, Stephen said, gathering the money together with shy',
 'don’t know yet what money is. Money is power. When you have lived',
 'Shakespeare say? Put but money in thy purse.',
 '—He knew what money was, Mr Deasy said. He made money. A poet, yes,',
 'of the canteen, over the motley slush. Even money Fair Rebel. Ten to one',
 'twelve. By the way go easy with that money like a good young imbecile.']

In [None]:
# RDD actions

# Operation return a value after running a computation on the RDD

# Basic RDD Actions: 

# collect () : collect() return all the elements of the dataset as an array

# take() : take(N) returns an array with the first N elements of the dataset


combinedRDD.take(3)


['with money and indigestion. Because he comes from Oxford. You know,',
 'downstairs and touch him for a guinea. He’s stinking with money and',
 '—Would I make any money by it? Stephen asked.']

In [None]:
# first(), top() and count() Actions

# first() prints the first element of the RDD

combinedRDD.first()


'with money and indigestion. Because he comes from Oxford. You know,'

In [None]:
combinedRDD.collect()[-1]

'So he went over to the biscuit tin Bob Doran left to see if there was'

In [None]:
# Take top elements
# This method should only be used if the resulting array is expected
# to be small, as all the data is loaded into the driver's memory.

# It returns the list sorted in descending order.

print (sc.parallelize([10, 4, 2, 12, 3]).top(1))

#print (combinedRDD.top(2)) # the first two lines in a descending order

[12]


In [None]:
# count() return the number of elements in the RDD

combinedRDD.count()

67

In [None]:
numbRDD = sc.parallelize([1,2,3,4])

# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: x**3)

# Collect the results
numbers_all = cubedRDD.collect ()

# Print the numbers from numbers_all
for numb in numbers_all:
    print(numb)

1
8
27
64


In [None]:
# glom () - return an RDD created by coalescing all elements within each partition into a list.

# https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0

rdd=sc.parallelize(range(10), 4)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 4
Partitions structure: [[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]


In [None]:
rdd=sc.parallelize(range(10))

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 8
Partitions structure: [[0], [1], [2], [3, 4], [5], [6], [7], [8, 9]]


### Introduction to pair RDDs in PySpark

Real life datasets are usually key/value pairs

Each row is a key and maps to one or more values

Pair RDD is a special data structure to work with this kind of datasets

Pair RDD: Key is the identifier and value is data

Creating pair RDDs

Two common ways to create pair RDDs

    From a list of key-value tuple
    From a regular RDD
    
Get the data into key/value form for paired RDD


In [None]:
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]

pairRDD_tuple = sc.parallelize(my_tuple)

my_list = ['Sam 23', 'Mary 34', 'Peter 25']

regularRDD = sc.parallelize(my_list)

pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))

pairRDD_RDD.collect()

[('Sam', '23'), ('Mary', '34'), ('Peter', '25')]

In [None]:
#  Fetching Values from a Paired RDD
pairRDD_RDD_Values = pairRDD_RDD.values()
pairRDD_RDD_Values.collect()

['23', '34', '25']

In [None]:
#  Fetching Keys from a Paired RDD
pairRDD_RDD_Keys = pairRDD_RDD.keys()
pairRDD_RDD_Keys.collect()

['Sam', 'Mary', 'Peter']

Transformations on pair RDDs

All regular transformations work on pair RDD

Have to pass functions that operate on tuples rather than on individual elements

Examples of paired RDD Transformations

reduceByKey(func): Combine values with the same key

groupByKey(): Group values with the same key

sortByKey(): Return an RDD sorted by the key

join(): Join two pair RDDs based on their key



In [None]:
# we can use user functions to map on RDD

def get_Squares(num):
    return num**2

numbRDD = sc.parallelize([1,2,3,4,2,5,1])

numbRDD.map(get_Squares).collect()

[1, 4, 9, 16, 4, 25, 1]

In [None]:
# finding the distinct numbers

numbRDD.distinct().collect()

[1, 2, 3, 4, 5]

In [None]:
#  Subract

numbRDD2 = sc.parallelize([1, 2, 3])

numbRDD.subtract(numbRDD2).collect()

[4, 5]

In [None]:
#  intersection

numbRDD.intersection(numbRDD2).collect()

[1, 2, 3]

In [None]:
# calculating basic stats

numbRDD = sc.parallelize([1,2,3,4,2,5,1])

print (numRDD.min())

print (numRDD.max())

print (numRDD.sum())

print (numRDD.mean())

print (numRDD.variance())

print (numRDD.stdev())

print (numRDD.stats())

print (numRDD.stats().asDict())

1
4
10
2.5
1.25
1.118033988749895
(count: 4, mean: 2.5, stdev: 1.118033988749895, max: 4.0, min: 1.0)
{'count': 4, 'mean': 2.5, 'sum': 10.0, 'min': 1.0, 'max': 4.0, 'stdev': 1.2909944487358056, 'variance': 1.6666666666666667}


In [None]:
# reduceByKey() transformation

# reduceByKey() transformation combines values with the same key

# It runs parallel operations for each key in the dataset

# It is a transformation and not action

regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34), ("Neymar", 22), ("Messi", 24)])

pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)

pairRDD_reducebykey.collect()


[('Ronaldo', 34), ('Neymar', 22), ('Messi', 47)]

In [None]:
# sortByKey() transformation

# sortByKey() operation orders pair RDD by key

#It returns an RDD sorted by key in ascending or descending order

pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))

pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()


[(47, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]

In [None]:
# groupByKey() transformation

# groupbykey() groups all the values with the same key in the pair RDD

airports = [("US", "JFK"),("UK", "LHR"),("FR", "CDG"),("US", "SFO")]

regularRDD = sc.parallelize(airports)

pairRDD_group = regularRDD.groupByKey().collect()

for cont, air in pairRDD_group:
    print(cont, list(air))


FR ['CDG']
UK ['LHR']
US ['JFK', 'SFO']


In [None]:
# join() transformation

# join() transformation joins the two pair RDDs based on their key

RDD1 = sc.parallelize([("Messi", 34),("Ronaldo", 32),("Neymar", 24)])

RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])

RDD1.join(RDD2).collect()


[('Neymar', (24, 120)), ('Ronaldo', (32, 80)), ('Messi', (34, 100))]

In [None]:
# reduce() action

# reduce(func) action is used for aggregating the elements of a regular RDD

# The function should be commutative and associative

# An example of reduce() action in PySpark

x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)

14

In [None]:
# saveAsTextFile() action
# saveAsTextFile() action saves RDD into a text file inside a directory with each partition as a separate file

RDD.saveAsTextFile("tempFile")

! cd tempFile && ls -l


total 32
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:34 _SUCCESS
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:34 part-00000
-rw-r--r--  1 vkocaman  staff  2 Jan  3 14:34 part-00001
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:34 part-00002
-rw-r--r--  1 vkocaman  staff  2 Jan  3 14:34 part-00003
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:34 part-00004
-rw-r--r--  1 vkocaman  staff  2 Jan  3 14:34 part-00005
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:34 part-00006
-rw-r--r--  1 vkocaman  staff  2 Jan  3 14:34 part-00007


In [None]:
# coalesce() method can be used to save RDD as a single text file

! rm -r tempFile # we remove the folder at first

RDD.coalesce(1).saveAsTextFile("tempFile")

! cd tempFile && ls -l

total 8
-rw-r--r--  1 vkocaman  staff  0 Jan  3 14:35 _SUCCESS
-rw-r--r--  1 vkocaman  staff  8 Jan  3 14:35 part-00000


Action Operations on pair RDDs

RDD actions available for PySpark pair RDDs

Pair RDD actions leverage the key-value data

Few examples of pair RDD actions include

- countByKey()

- collectAsMap()

In [None]:
# countByKey() action

# countByKey() only available for type (K, V)

# countByKey() action counts the number of elements for each key

# Example of countByKey() on a simple list

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

for key, val in rdd.countByKey().items():
    print(key, val)


a 2
b 1


In [None]:
# collectAsMap() action

# collectAsMap() return the key-value pairs in the RDD as a dictionary

# Example of collectAsMap() on a simple tuple

sc.parallelize([(1, 2), (3, 4)]).collectAsMap()


{1: 2, 3: 4}

In [None]:
# word count example

text_file = sc.textFile("example_text.txt")
counts_rdd = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

# print the word frequencies in descending order

counts_rdd.map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending=False)\
    .collect()[:20]


[(13609, 'the'),
 (10549, ''),
 (8134, 'of'),
 (6551, 'and'),
 (5841, 'a'),
 (4788, 'to'),
 (4619, 'in'),
 (3034, 'his'),
 (2712, 'he'),
 (2430, 'I'),
 (2391, 'with'),
 (2169, 'that'),
 (2006, 'was'),
 (1894, 'on'),
 (1791, 'for'),
 (1680, 'it'),
 (1505, 'her'),
 (1363, 'you'),
 (1246, 'is'),
 (1217, 'at')]

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate. So four times this fraction is equal to π.

Note: If a circle of radius R is inscribed inside a square with side length 2R, then the area of the circle will be pi*R^2 and the area of the square will be (2R)^2. So the ratio of the area of the circle to the area of the square will be pi/4. This means that, if you pick N points at random inside the square, approximately N*pi/4 of those points should fall inside the circle.

The "Monte Carlo Method" is a method of solving problems using statistics. Given the probability, P, that an event will occur in certain conditions, a computer can be used to generate those conditions repeatedly. The number of times the event occurs divided by the number of times the conditions are generated should be approximately equal to P.


In [None]:
# how to find pi
import random

def inside_circle(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 100000

count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside_circle).count()

print ("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.135040


#### bigrams and word frequencies

For a slightly more complicated task, lets look into splitting up sentences from our documents into word bigrams. A bigram is pair of successive tokens in some sequence. We will look at building bigrams from the sequences of words in each sentence, and then try to find the most frequently occuring ones.

The first problem is that values in each partition of our initial RDD describe lines from the file rather than sentences. Sentences may be split over multiple lines. The glom() RDD method is used to create a single entry for each document containing the list of all lines, we can then join the lines up, then resplit them into sentences using "." as the separator, using flatMap so that every object in our RDD is now a sentence.

Now we have isolated each sentence we can split it into a list of words and extract the word bigrams from it. Our new RDD contains tuples containing the word bigram (itself a tuple containing the first and second word) as the first value and the number 1 as the second value.

Finally we can apply the same reduceByKey and sort steps that we used in the wordcount example, to count up the bigrams and sort them in order of descending frequency. In reduceByKey the key is not an individual word but a bigram.



In [None]:
# bigrams and word frequencies

sentences = sc.textFile("example_text.txt") \
    .glom() \
    .map(lambda x: " ".join(x)) \
    .flatMap(lambda x: x.split("."))

bigrams = sentences.map(lambda x:x.split()) \
    .flatMap(lambda x: [((x[i],x[i+1]),1) for i in range(0,len(x)-1)])

freq_bigrams = bigrams.reduceByKey(lambda x,y:x+y) \
    .map(lambda x:(x[1],x[0])) \
    .sortByKey(False)

freq_bigrams.take(10)

# http://www.mccarroll.net/blog/pyspark2/index.html

[(1635, ('of', 'the')),
 (1384, ('in', 'the')),
 (657, ('on', 'the')),
 (609, ('to', 'the')),
 (460, ('and', 'the')),
 (401, ('of', 'a')),
 (360, ('at', 'the')),
 (345, ('for', 'the')),
 (325, ('from', 'the')),
 (323, ('with', 'the'))]

In [None]:
sentences.take(3)

[' The Project Gutenberg EBook of Ulysses, by James Joyce  This eBook is for the use of anyone anywhere at no cost and with almost no restrictions whatsoever',
 ' You may copy it, give it away or re-use it under the terms of the Project Gutenberg License included with this eBook or online at www',
 'gutenberg']

## PySpark DataFrames

If you’re used to working with Pandas or data frames in R, you’ll have probably also expected to see a header, but there is none. To make your life easier, you will move on from the RDD and convert it to a DataFrame. Dataframes are preferred over RDDs whenever you can use them. Especially when you’re working with Python, the performance of DataFrames is better than RDDs.

But what is the difference between the two?

You can use RDDs when you want to perform low-level transformations and actions on your unstructured data. This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column. Tying in to what was said before about performance, by using RDDs, you don’t necessarily want the performance benefits that DataFrames can offer for (semi-) structured data. Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.

To recapitulate, you’ll switch to DataFrames now to use high-level expressions, to perform SQL queries to explore your data further and to gain columnar access.

In Apache Spark, a DataFrame is a distributed collection of rows under named columns. It is conceptually equivalent to a table in a relational database, an Excel sheet with Column headers, or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. 

It also shares some common characteristics with RDD:

Immutable in nature : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD after applying transformations.

Lazy Evaluations: Which means that a task is not executed until an action is performed.

Distributed: RDD and DataFrame both are distributed in nature.

Advantages of the DataFrame:

DataFrames are designed for processing large collection of structured or semi-structured data.

Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.

DataFrame in Apache Spark has the ability to handle petabytes of data.

DataFrame has a support for wide range of data format and sources.

It has API support for different languages like Python, R, Scala, Java.


### Using DataFrames

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!

To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

PySpark SQL is a Spark library for structured data. It provides more information about the structure of data and computation

DataFrame is immutable distributed collection of data with named columns

Designed for processing both structured (e.g relational database) and unstructured data (e.g JSON)

Dataframe API is available in Java, Scala, Python, and R

DataFrames in PySpark support both SQL queries (SELECT * from table) or expression methods (df.select())


#### SparkSession - Entry point for DataFrame API

SparkContext is the main entry point for creating RDDs

SparkSession provides a single point of entry to interact with Spark DataFrames

SparkSession is used to create DataFrame, register DataFrames, execute SQL queries


In [None]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName('SDDM_2').getOrCreate()

In [None]:
ss.version

'2.4.0'

#### Creating DataFrames in PySpark

Two different methods of creating DataFrames in PySpark

From existing RDDs using SparkSession's createDataFrame() method

From various data sources (CSV, JSON, TXT) using SparkSession's read method

Schema controls the data and helps DataFrames to optimize queries

Schema provides information about column name, type of data in the column, empty values etc.,

In [None]:
# Create a DataFrame from RDDD

iphones_RDD = sc.parallelize([
    ("XS", 2018, 5.65, 2.79, 6.24),
    ("XR", 2018, 5.94, 2.98, 6.84),
    ("X10", 2017, 5.65, 2.79, 6.13),
    ("8Plus", 2017, 6.23, 3.07, 7.12)
])

names = [ 'Model',
          'Year',
          'Height',
          'Width',
          'Weight'
]

iphones_df = ss.createDataFrame(iphones_RDD, schema=names)

type(iphones_df)


pyspark.sql.dataframe.DataFrame

In [None]:
wget -q -O - https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2017-06.csv | head -n 5000 > tmp.csv

Unnamed: 0,Country,Region
0,Algeria,AFRICA
1,Angola,AFRICA
2,Benin,AFRICA
3,Botswana,AFRICA
4,Burkina,AFRICA
5,Burundi,AFRICA
6,Cameroon,AFRICA
7,Cape Verde,AFRICA
8,Central African Republic,AFRICA
9,Chad,AFRICA


In [None]:
# Create a DataFrame from reading a CSV/JSON/TXT

df_csv = spark.read.csv("people.csv", 
                    header=True, inferSchema=True)

df_json = spark.read.json("people.json", 
                    header=True, inferSchema=True)

df_txt = spark.read.txt("people.txt", 
                    header=True, inferSchema=True)

# Path to the file and two optional parameters

# Two optional parameters

# header=True and inferSchema=True

# for more information aboput df.persist(StorageLevel.MEMORY_AND_DISK_SER) see below
# https://blog.insightdatascience.com/using-jupyter-on-apache-spark-step-by-step-with-a-terabyte-of-reddit-data-ef4d6c13959a


In [None]:
df_csv = ss.read.csv("airports.csv", 
                    header=True, inferSchema=True)

# printSchema() operation prints the types of columns in the DataFrame

df_csv.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



### DataFrame operators in PySpark

DataFrame operations: Transformations and Actions

DataFrame Transformations:

select(), filter(), groupby(), orderby(), dropDuplicates() and withColumnRenamed()

DataFrame Actions :

printSchema(), head(), show(), count(), columns() and describe()

In [None]:
df_csv.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [None]:
df_csv.count()

1397

In [None]:
# select() and show() operations

# select() transformation subsets the columns in the DataFrame

df_id_name = df_csv.select('name')

# show() action prints first 20 rows in the DataFrame

df_id_name.show(3)

# only showing top 3 rows

+--------------------+
|                name|
+--------------------+
|   Lansdowne Airport|
|Moton Field Munic...|
| Schaumburg Regional|
+--------------------+
only showing top 3 rows



In [None]:
# filter() transformation filters out the rows based on a condition

new_df = df_csv.filter(df_csv.alt < 100)

new_df.show(3)

+---+--------------------+----------+------------+---+---+---+
|faa|                name|       lat|         lon|alt| tz|dst|
+---+--------------------+----------+------------+---+---+---+
|09J|Jekyll Island Air...|31.0744722| -81.4277778| 11| -4|  A|
|1RL|Point Roberts Air...|48.9797222|-123.0788889| 10| -7|  A|
|369|  Atmautluak Airport| 60.866667| -162.273056| 18|-10|  A|
+---+--------------------+----------+------------+---+---+---+
only showing top 3 rows



In [None]:
# we can also use brackets (as in Pandas) instead of filter()

df_csv[df_csv.alt < 100].show(3)

+---+--------------------+----------+------------+---+---+---+
|faa|                name|       lat|         lon|alt| tz|dst|
+---+--------------------+----------+------------+---+---+---+
|09J|Jekyll Island Air...|31.0744722| -81.4277778| 11| -4|  A|
|1RL|Point Roberts Air...|48.9797222|-123.0788889| 10| -7|  A|
|369|  Atmautluak Airport| 60.866667| -162.273056| 18|-10|  A|
+---+--------------------+----------+------------+---+---+---+
only showing top 3 rows



In [None]:

df_csv[(df_csv.alt < 100) & (df_csv["dst"] != 'A')].show(3)

# df_csv[(df_csv.alt < 100) & (df_csv.dst != 'A')].show(3)

+---+--------------------+----------+-----------+---+---+---+
|faa|                name|       lat|        lon|alt| tz|dst|
+---+--------------------+----------+-----------+---+---+---+
|60J|Ocean Isle Beach ...|33.9085056|-78.4366722| 32| -5|  U|
|HHH|         Hilton Head|    32.216|    -80.752| 10| -5|  U|
|HNL|       Honolulu Intl| 21.318681|-157.922428| 13|-10|  N|
+---+--------------------+----------+-----------+---+---+---+
only showing top 3 rows



In [None]:
# groupby() operation can be used to group a variable

df_csv_group = df_csv.groupby('dst')

df_csv_group.count().show(3)

+---+-----+
|dst|count|
+---+-----+
|  U|   45|
|  A| 1329|
|  N|   23|
+---+-----+



In [None]:
# orderby() operation sorts the DataFrame based one or more columns

df_csv_group.count().orderBy('dst').show(3)

+---+-----+
|dst|count|
+---+-----+
|  A| 1329|
|  N|   23|
|  U|   45|
+---+-----+



In [None]:
# dropDuplicates() removes the duplicate rows of a DataFrame

df_no_dup = df_csv.select('alt', 'dst').dropDuplicates()

df_no_dup.count()


911

In [None]:
# withColumnRenamed() renames a column in the DataFrame

df_csv_alt = df_csv.withColumnRenamed('alt', 'altitude')

df_csv_alt.show(3)

+---+--------------------+----------+-----------+--------+---+---+
|faa|                name|       lat|        lon|altitude| tz|dst|
+---+--------------------+----------+-----------+--------+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|    1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278|     264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428|     801| -6|  A|
+---+--------------------+----------+-----------+--------+---+---+
only showing top 3 rows



In [None]:
# creating a new column from existing ones

df_csv_alt = df_csv_alt.withColumn('tzxaltitude', df['tz'] * df['altitude']),
# we didn't run this cell before

In [None]:
# dropping a column (no axis concept)

df_csv_alt= df_csv_alt.drop("tz")

df_csv_alt.show(3)

+---+--------------------+----------+-----------+--------+---+
|faa|                name|       lat|        lon|altitude|dst|
+---+--------------------+----------+-----------+--------+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|    1044|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278|     264|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428|     801|  A|
+---+--------------------+----------+-----------+--------+---+
only showing top 3 rows



In [None]:
# columns() operator prints the columns of a DataFrame

df_csv.columns


['faa', 'name', 'lat', 'lon', 'alt', 'tz', 'dst']

In [None]:
# describe() operation compute summary statistics of numerical columns in the DataFrame

df_csv.select('lat', 'lon', 'alt').describe().show()


+-------+------------------+------------------+------------------+
|summary|               lat|               lon|               alt|
+-------+------------------+------------------+------------------+
|  count|              1397|              1397|              1397|
|   mean| 41.75029635989892|-103.6891285724532|1005.9169649248389|
| stddev|10.549872185047212|30.125313702028542|1521.2701426664623|
|    min|         19.721375|          -176.646|               -54|
|    max|         72.270833|         174.11362|              9078|
+-------+------------------+------------------+------------------+



## Interacting with DataFrames using PySpark SQL

DataFrame API vs SQL queries

In PySpark You can interact with SparkSQL through DataFrame API and SQL queries

The DataFrame API provides a programmatic domain-specific language (DSL) for data

DataFrame transformations and actions are easier to construct programmatically

SQL queries can be concise and easier to understand and portable

The operations on DataFrames can also be done using SQL queries

### SQL notes

A SQL query returns a table derived from one or more tables contained in a database.

Every SQL query is made up of commands that tell the database what you want to do with the data. The two commands that every query has to contain are SELECT and FROM.

The SELECT command is followed by the columns you want in the resulting table.

The FROM command is followed by the name of the table that contains those columns. The minimal SQL query is:

SELECT * FROM my_table;

The * selects all columns, so this returns the entire table named my_table.

Similar to .withColumn(), you can do column-wise computations within a SELECT statement. For example,

SELECT origin, dest, air_time / 60 FROM flights;

returns a table with the origin, destination, and duration in hours for each flight.

Another commonly used command is WHERE. This command filters the rows of the table based on some logical condition you specify. The resulting table contains the rows where your condition is true. For example, if you had a table of students and grades you could do:

SELECT * FROM students
WHERE grade = 'A';

to select all the columns and the rows containing information about students who got As.


Another common database task is aggregation. That is, reducing your data by breaking it into chunks and summarizing each chunk.

This is done in SQL using the GROUP BY command. This command breaks your data into groups and applies a function from your SELECT statement to each group.

For example, if you wanted to count the number of flights from each of two origin destinations, you could use the query

SELECT COUNT(*) FROM flights
GROUP BY origin;

GROUP BY origin tells SQL that you want the output to have a row for each unique value of the origin column. The SELECT statement selects the values you want to populate each of the columns. Here, we want to COUNT() every row in each of the groups.

It's possible to GROUP BY more than one column. When you do this, the resulting table has a row for every combination of the unique values in each column. The following query counts the number of flights from SEA and PDX to every destination airport:

SELECT origin, dest, COUNT(*) FROM flights
GROUP BY origin, dest;

The output will have a row for every combination of the values in origin and dest (i.e. a row listing each origin and destination that a flight flew to). There will also be a column with the COUNT() of all the rows in each group.

Another very common data operation is the join. Joins are a whole topic unto themselves, so in this course we'll just look at simple joins. If you'd like to learn more about joins, you can take a look here.

A join will combine two different tables along a column that they share. This column is called the key. Examples of keys here include the tailnum and carrier columns from the flights table.

For example, suppose that you want to know more information about the plane that flew a flight than just the tail number. This information isn't in the flights table because the same plane flies many different flights over the course of two years, so including this information in every row would result in a lot of duplication. To avoid this, you'd have a second table that has only one row for each plane and whose columns list all the information about the plane, including its tail number. You could call this table planes

When you join the flights table to this table of airplane information, you're adding all the columns from the planes table to the flights table. To fill these columns with information, you'll look at the tail number from the flights table and find the matching one in the planes table, and then use that row to fill out all the new columns.

Now you'll have a much bigger table than before, but now every row has all information about the plane that flew that flight!



In [None]:
# Executing SQL Queries

# The SparkSession sql() method executes SQL query

# sql() method takes a SQL statement as an argument and returns the result as DataFrame

df_csv.createOrReplaceTempView("table1")

df2 = ss.sql("SELECT name, lat, lon, dst, alt FROM table1 WHERE alt > 100")

df2.show(5)

+--------------------+----------+-----------+---+----+
|                name|       lat|        lon|dst| alt|
+--------------------+----------+-----------+---+----+
|   Lansdowne Airport|41.1304722|-80.6195833|  A|1044|
|Moton Field Munic...|32.4605722|-85.6800278|  A| 264|
| Schaumburg Regional|41.9893408|-88.1012428|  A| 801|
|     Randall Airport| 41.431912|-74.3915611|  A| 523|
|Elizabethton Muni...|36.3712222|-82.1734167|  A|1593|
+--------------------+----------+-----------+---+----+
only showing top 5 rows



In [None]:
df2.columns

['name', 'lat', 'lon', 'dst', 'alt']

In [None]:
# Summarizing and grouping data using SQL queries

df2.createOrReplaceTempView("table2")

query = 'SELECT dst, max(alt) FROM table2 GROUP BY dst'

ss.sql(query).show(5)

+---+--------+
|dst|max(alt)|
+---+--------+
|  U|    6548|
|  A|    9078|
|  N|    7015|
+---+--------+



In [None]:
ss.catalog.listTables()

[Table(name='table1', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='table2', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
df = ss.read.csv ("flights_small.csv",inferSchema=True, header=True)

df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [None]:
df.createOrReplaceTempView("flights")

query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = ss.sql(query)

# Show the results
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

### Pandafy a Spark DataFrame

In [None]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Run the query
flight_counts = ss.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


#### Put some Spark in your data

In [None]:
import numpy as np
import pandas as pd

In [None]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = ss.createDataFrame(pd_temp)

# Examine the tables in the catalog
ss.catalog.listTables()

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='table1', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='table2', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# Add spark_temp to the catalog
spark_temp.name = spark_temp.createOrReplaceTempView('temp')

# Examine the tables in the catalog again
ss.catalog.listTables()

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='table1', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='table2', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
file_path = "airports.csv"

# Read in the airports data
airports = ss.read.csv(file_path, header=True)

# Show the data
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
ss.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/Users/vkocaman/Python_Projects/Leiden/Spark/spark-warehouse')]

In [None]:
flights = ss.read.csv('flights_small.csv', header=True)

In [None]:

flights.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [None]:
# add new column

flights = flights.withColumn('duration_hrs', flights.air_time / 60)

flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

In [None]:
# Filter flights with a SQL string
long_flights1 = flights.filter('distance > 1000')

# Filter flights with a boolean column
long_flights2 = flights.filter(flights.distance > 1000)

# Examine the data to check they're equal
print(long_flights1.show(3))
print(long_flights2.show(3))

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|        2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|         3.3|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 3 rows

None
+----+-----+---+--------+---------+----

In [None]:
# Select the first set of columns
selected1 = flights.select("tailnum","origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

print (temp.show(10))

# Define first filter
filterA = (flights.origin == "SEA")

# Define second filter
filterB = (flights.dest == "PDX")

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

selected2.show(3)

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| LAX|     VX|
|   SEA| HNL|     AS|
|   SEA| SFO|     VX|
|   PDX| SJC|     WN|
|   SEA| BUR|     AS|
|   PDX| DEN|     WN|
|   PDX| OAK|     WN|
|   SEA| SFO|     VX|
|   SEA| SAN|     AS|
|   SEA| ORD|     AS|
+------+----+-------+
only showing top 10 rows

None
+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 3 rows



In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

print (speed1.show(5))

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

print (speed2.show(5))


+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows

None
+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows

None


In [None]:
# casting

flights = flights.withColumn("distance", flights.distance.cast("float"))

In [None]:
flights = flights.withColumn("air_time", flights.air_time.cast("float"))

In [None]:
flights = flights.withColumn("dep_delay", flights.dep_delay.cast("float"))

In [None]:
flights.describe('air_time', 'distance', "dep_delay").show()

+-------+------------------+-----------------+------------------+
|summary|          air_time|         distance|         dep_delay|
+-------+------------------+-----------------+------------------+
|  count|              9925|            10000|              9952|
|   mean|152.88423173803525|        1208.1516| 6.068629421221865|
| stddev|  72.8656286392139|656.8599023464376|28.808608062751805|
|    min|              20.0|             93.0|             -19.0|
|    max|             409.0|           2724.0|             886.0|
+-------+------------------+-----------------+------------------+



In [None]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of duration
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

+-------------+
|min(distance)|
+-------------+
|        106.0|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|        409.0|
+-------------+



In [None]:
# Average duration of Delta flights
flights.filter(flights.carrier == "DL")\
       .filter(flights.origin == "SEA")\
       .groupBy().avg('air_time')\
       .show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



In [None]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [None]:
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+--------------------+
|month|dest|      avg(dep_delay)|
+-----+----+--------------------+
|   11| TUS| -2.3333333333333335|
|   11| ANC|   7.529411764705882|
|    1| BUR|               -1.45|
|    1| PDX| -5.6923076923076925|
|    6| SBA|                -2.5|
|    5| LAX|-0.15789473684210525|
|   10| DTW|                 2.6|
|    6| SIT|                -1.0|
|   10| DFW|  18.176470588235293|
|    3| FAI|                -2.2|
|   10| SEA|                -0.8|
|    2| TUS| -0.6666666666666666|
|   12| OGG|  25.181818181818183|
|    9| DFW|   4.066666666666666|
|    5| EWR|               14.25|
|    3| RDM|                -6.2|
|    8| DCA|                 2.6|
|    7| ATL|   4.675675675675675|
|    4| JFK| 0.07142857142857142|
|   10| SNA| -1.1333333333333333|
+-----+----+--------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|  

In [None]:
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
# Rename the faa column
airports = airports.withColumnRenamed("faa", "dest")

# Join the DataFrames
flights_with_airports = flights.join(airports, on="dest", how="leftouter")

# Examine the data again
print(flights_with_airports.show(3))

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------+------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|duration_hrs|              name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------+------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|     -7.0|     935|       -5|     VX| N846VA|  1780|   SEA|   132.0|   954.0|   6|    58|         2.2|  Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|      5.0|    1505|        5|     AS| N559AS|   851|   SEA|   360.0|  2677.0|  10|    40|         6.0|     Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|     -2.0|    1652|        2|     VX| N8

In [None]:
flights_with_airports.limit(10).show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|     -7.0|     935|       -5|     VX| N846VA|  1780|   SEA|   132.0|   954.0|   6|    58|               2.2|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|      5.0|    1505|        5|     AS| N559AS|   851|   SEA|   360.0|  2677.0|  10|    40|               6.0|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    

In [None]:
%matplotlib inline
a=flights_with_airports.toPandas()
a["month"].hist()

In [None]:
flights_with_airports.fillna(0)

In [None]:

filename="/Users/vkocaman/Python_Projects/Leiden/Hadoop/ml-100k/u.data"

movie_rdd=sc.parallelize([ (x[0],x[1]) for x in csv.reader(open(filename,'r'),delimiter='\t')])

movie_rdd.take(5)

[('196', '242'), ('186', '302'), ('22', '377'), ('244', '51'), ('166', '346')]

In [None]:
#ss.read.csv("/Users/vkocaman/Python_Projects/Leiden/Hadoop/ml-100k/data.csv", inferSchema=True, header=None).collect()

In [None]:
# word count example

counts_rdd = movie_rdd.map(lambda word: (word[0], 1)) \
             .reduceByKey(lambda a, b: a + b)

# print the word frequencies in descending order

counts_rdd.map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending=False)\
    .collect()[:10]


[(737, '405'),
 (685, '655'),
 (636, '13'),
 (540, '450'),
 (518, '276'),
 (493, '416'),
 (490, '537'),
 (484, '303'),
 (480, '234'),
 (448, '393')]

In [None]:
m=[a[0] for a in [ (x[0],x[1]) for x in csv.reader(open(filename,'r'),delimiter='\t')]]

In [None]:
from collections import Counter

max(Counter(m).values())

Counter(m)["405"]

737

### PySaprk UDF 

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
import warnings

ss = SparkSession.builder.appName('SDDM').master("local[*]").getOrCreate()


df_spark = ss.read.csv("employee_email_data_v2.csv", 
                    header=True, inferSchema=True)

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql import functions as f

def generate_udf():
    
    def spark_func(user, tag, hour):

        if "out" in user:
            txt_1 = user.replace("out_","").replace ("_"," ") + " who is working for another company"

        else:
            txt_1 = user.replace("_"," ") + " who is working for our company"

        if tag == "from":
            txt_2 = " sent an email"

        elif tag == "to":
            txt_2 = " received an email"

        if hour > 17 or hour < 9:
            txt_3 = " between 6 pm and 9 am"
        else:
            txt_3 = " during work hours"

        return txt_1 + txt_2 + txt_3
    
    return f.udf(spark_func, StringType())


%%time 

df_spark = df_spark.withColumn('comment', 
                   generate_udf()(f.col('user_ids'), f.col('tag'), f.col('hour')))

df_spark.collect(),


In [None]:
df_spark.show(5, False)

In [None]:
https://www.linkedin.com/pulse/insider-spark-adventure-bar%C4%B1%C5%9F-can-tayiz/
    
    https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa?gi=165a78ac88a6

### PySpark vs Pandas (similar functions)

http://localhost:8888/notebooks/Python_Projects/Leiden/Spark/python_spark_ortak_fonksiyonlar.ipynb

## Getting started with machine learning pipelines

In [None]:
text_file = sc.textFile("example_text.txt")
wordSeqs = text_file.map(lambda s: [w.lower() for w in s.split()])

In [None]:
from pyspark.mllib.feature import Word2Vec

#w2v = Word2Vec()
#model = w2v.fit(wordSeqs)

# find synonyms for a given word
synonyms = model.findSynonyms('money', 5)

for word, distance in synonyms:
    print("{}: {}".format(word, distance))

becomes: 0.926846444606781
pounds.: 0.9183464050292969
fish: 0.9091755747795105
was.: 0.8968182802200317
poet: 0.8965718746185303


In [None]:
# https://github.com/radanalyticsio/workshop-notebook/blob/master/pyspark.ipynb

# https://github.com/radanalyticsio/workshop-notebook/blob/master/ml-basics.ipynb



In [None]:
from flair.embeddings import FlairEmbeddings

# The sentence objects holds a sentence that we may want to embed or tag
from flair.data import Sentence

# init embedding
flair_embedding_forward = FlairEmbeddings('news-forward')

# create a sentence
sentence = Sentence('The grass is green .')

# embed words in sentence
flair_embedding_forward.embed(sentence)

[Sentence: "The grass is green ." - 5 Tokens]

In [None]:
from flair.embeddings import WordEmbeddings, FlairEmbeddings, StackedEmbeddings

# create a StackedEmbedding object that combines glove and forward/backward flair embeddings
stacked_embeddings = StackedEmbeddings([
                                        WordEmbeddings('glove'), 
                                        FlairEmbeddings('news-forward'), 
                                        FlairEmbeddings('news-backward'),
                                       ])

2019-01-06 00:03:47,361 https://s3.eu-central-1.amazonaws.com/alan-nlp/resources/embeddings/glove.gensim.vectors.npy not found in cache, downloading to /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpx0dfmmbs


100%|██████████| 160000128/160000128 [00:32<00:00, 4858492.77B/s]

2019-01-06 00:04:20,594 copying /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpx0dfmmbs to cache at /Users/vkocaman/.flair/embeddings/glove.gensim.vectors.npy





2019-01-06 00:04:20,933 removing temp file /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpx0dfmmbs
2019-01-06 00:04:21,133 https://s3.eu-central-1.amazonaws.com/alan-nlp/resources/embeddings/glove.gensim not found in cache, downloading to /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmp3lk7sk4r


100%|██████████| 21494764/21494764 [00:04<00:00, 4755968.97B/s]

2019-01-06 00:04:25,932 copying /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmp3lk7sk4r to cache at /Users/vkocaman/.flair/embeddings/glove.gensim
2019-01-06 00:04:25,978 removing temp file /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmp3lk7sk4r





2019-01-06 00:04:27,565 https://s3.eu-central-1.amazonaws.com/alan-nlp/resources/embeddings/lm-news-english-backward-v0.2rc.pt not found in cache, downloading to /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpmdb4arf9


100%|██████████| 72405799/72405799 [00:15<00:00, 4571768.13B/s]

2019-01-06 00:04:44,801 copying /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpmdb4arf9 to cache at /Users/vkocaman/.flair/embeddings/lm-news-english-backward-v0.2rc.pt





2019-01-06 00:04:44,962 removing temp file /var/folders/jb/5px3dvgj4bj86ls0lmbswv680000gn/T/tmpmdb4arf9


In [None]:
sentence = Sentence('The grass is green .')

# just embed a sentence using the StackedEmbedding as you would with any single embedding.
stacked_embeddings.embed(sentence)

# now check out the embedded tokens.
for token in sentence:
    print(token)
    print(token.embedding)

Token: 1 The
tensor([-3.8194e-02, -2.4487e-01,  7.2812e-01,  ..., -2.5692e-05,
        -5.9604e-03, -2.5547e-03])
Token: 2 grass
tensor([-8.1353e-01,  9.4042e-01, -2.4048e-01,  ..., -6.7730e-05,
        -3.0360e-03, -1.3282e-02])
Token: 3 is
tensor([-0.5426,  0.4148,  1.0322,  ..., -0.0066, -0.0036, -0.0014])
Token: 4 green
tensor([-6.7907e-01,  3.4908e-01, -2.3984e-01,  ..., -2.2563e-05,
        -1.0894e-04, -4.3916e-03])
Token: 5 .
tensor([-3.3979e-01,  2.0941e-01,  4.6348e-01,  ...,  4.1382e-05,
        -4.4364e-04, -2.5425e-02])


In [None]:
stacked_embeddings.embed(Sentence("Spain"))

for token in Sentence("Spain"):
    print(token)
    print(token.embedding)

Token: 1 Spain
tensor([])


## additional resources

https://towardsdatascience.com/the-hitchhikers-guide-to-handle-big-data-using-spark-90b9be0fe89a

https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393

https://www.youtube.com/watch?v=QaoJNXW6SQo
(Spark Tutorial For Beginners | Big Data Spark Tutorial | Apache Spark Tutorial | Simplilearn)

Querying large dataset with PySpark SQL from S3 on Local Jupyter Notebook
https://blog.insightdatascience.com/using-jupyter-on-apache-spark-step-by-step-with-a-terabyte-of-reddit-data-ef4d6c13959a

https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

https://blog.usejournal.com/spark-study-notes-core-concepts-visualized-5256c44e4090

https://data-flair.training/blogs/spark-tutorial/

https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

https://nbviewer.jupyter.org/github/mepa/sads-pyspark/blob/master/2017-09-14-PySpark-Workshop.slides.html

https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473

Spark Web UI
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-webui.html

And some more detail about UI >> 
https://blog.insightdatascience.com/using-jupyter-on-apache-spark-step-by-step-with-a-terabyte-of-reddit-data-ef4d6c13959a

https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa


https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

https://towardsdatascience.com/how-does-apache-spark-run-on-a-cluster-974ec2731f20

https://blog.usejournal.com/spark-study-notes-core-concepts-visualized-5256c44e4090

https://stackoverflow.com/questions/32356143/what-does-setmaster-local-mean-in-spark

https://techvidvan.com/tutorials/spark-modes-of-deployment/


Spark Streaming

Test with netcat local data server (https://spark.apache.org/docs/latest/streaming-programming-guide.html)

https://engineering.billymob.com/introducing-spark-streaming-c1b8be36c775

https://engineering.billymob.com/apache-spark-streaming-kafka-0-10-1f3c29a694cb

https://engineering.billymob.com/feature-integrating-kafka-with-spark-streaming-47763f6bcf58

https://medium.com/@kass09/spark-streaming-kafka-in-python-a-test-on-local-machine-edd47814746

*https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/  (two diff version of application-- windowed vs batch)

(Getting+Started+with+Spark+Streaming+with+Python+and+Kafka.ipynb)

Part-2 : (write filtered tweets to another kafka topic)
https://www.rittmanmead.com/blog/2017/01/data-processing-and-enrichment-in-spark-streaming-with-python-and-kafka/

https://medium.com/@mukeshkumar_46704/getting-streaming-data-from-kafka-with-spark-streaming-using-python-9cd0922fa904

Full code
https://gist.github.com/rmoff/fb033086b285655ffe7f9ff0582dedbf

http://tlfvincent.github.io/2016/09/25/kafka-spark-pipeline-part-1/

https://www.supergloo.com/fieldnotes/spark-streaming-kafka-example/

https://www.opcito.com/blogs/building-a-real-time-data-pipeline-using-spark-streaming-and-kafka/

https://www.opcito.com/blogs/data-ingestion-with-hadoop-yarn-spark-and-kafka/


Install and run PySpark on Jupyter Notebook at your local machine 

https://towardsdatascience.com/how-to-use-pyspark-on-your-computer-9c7180075617

https://medium.freecodecamp.org/how-to-set-up-pyspark-for-your-jupyter-notebook-7399dd3cb389

https://github.com/tirthajyoti/Spark-with-Python

Run PySpark with Docker at your local machine

https://levelup.gitconnected.com/using-docker-and-pyspark-134cd4cab867

https://medium.com/@suci/running-pyspark-on-jupyter-notebook-with-docker-602b18ac4494

https://medium.com/@GaryStafford/getting-started-with-pyspark-for-big-data-analytics-using-jupyter-notebooks-and-docker-ba39d2e3d6c7 (including Postgres db)

Install and run PySpark on Jupyter Notebook at AWS EC2 

https://medium.com/@josemarcialportilla/getting-spark-python-and-jupyter-notebook-running-on-amazon-ec2-dec599e1c297


Install and run PySpark on AWS EMR (with Hadoop and Spark pre-installed)

https://towardsdatascience.com/end-to-end-distributed-ml-using-aws-emr-apache-spark-pyspark-and-mongodb-tutorial-with-4d1077f68381 

https://medium.com/@datitran/quickstart-pyspark-with-anaconda-on-aws-660252b88c9a

**https://medium.com/idealo-tech-blog/using-terraform-to-quick-start-pyspark-on-aws-2bc8ce9dcac

Install and run PySpark on Jupyter Notebook at GCP DataProc
	https://towardsdatascience.com/data-science-for-startups-pyspark-1acf51e9d6ba

https://cloud.google.com/blog/products/gcp/google-cloud-platform-for-data-scientists-using-jupyter-notebooks-with-apache-spark-on-google-cloud

Submit PySpark jobs on GCP DataProc (with Hadoop and Spark pre-installed)

https://towardsdatascience.com/step-by-step-tutorial-pyspark-sentiment-analysis-on-google-dataproc-fef9bef46468

Run PySpark on DSLab Machines

Running Spark clusters on Databricks Community Edition (just let the students know that this is another option.. no need to delve into)

Spark MLlib (ML with PySpark)

https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35

Data cleaning >> https://github.com/radanalyticsio/workshop-notebook/blob/master/workshop.ipynb

https://github.com/radanalyticsio/workshop-notebook/blob/master/ml-basics.ipynb

Deploying PySpark ML Model on Google Compute Engine as a REST API

https://towardsdatascience.com/deploying-pyspark-ml-model-on-google-compute-engine-as-a-rest-api-d69e126b30b1


NASA log analysis
https://opensource.com/article/19/5/log-data-apache-spark
https://opensource.com/article/19/5/visualize-log-data-apache-spark


### technicalities of Spark

https://spark.apache.org/docs/latest/spark-standalone.html
    
http://devopspy.com/python/apache-spark-pyspark-centos-rhel/
    
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-webui.html
    
https://medium.com/ymedialabs-innovation/apache-spark-on-a-multi-node-cluster-b75967c8cb2b
    
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-standalone-example-2-workers-on-1-node-cluster.adoc
    
http://spark.apache.org/docs/latest/submitting-applications.html
        
https://docs.anaconda.com/anaconda-scale/howto/spark-basic/
    
https://www.datacamp.com/community/tutorials/apache-spark-python
    
https://data-flair.training/blogs/install-apache-spark-multi-node-cluster/
    
https://www.programcreek.com/2018/11/install-spark-on-ubuntu-standalone-mode/