In [None]:
# create RDD
# 3 methods
# 1st: pass an existing object to SparkContext's parallelize method
# 2nd: load data from an external hard drive (HDFS) or from Amazon s3 bucket or lines from a text
# 3rd: from an existing RDD

# 1st method: parallelized collection 
numRDD = sc.parallelize([1, 2, 3, 4])
helloRDD = sc.parallelize('Hello World')
type(helloRDD)

# create RDD using the external datasets, textFile() method
fileRDD = sc.textFile('README.md')
type(fileRDD)


# LOAD DATA INTO RDDs
# understanding how Spark deals with partitions allows us to control parallelism
# create an RDD using SparkContext's parallelize method with 6 partitions
numRDD = sc.parallelize(range(10), minPartitions=6)

# or we can use this method
numRDD = sc.textFile('README.md', minPartitions=6)

print('The number of partitions in numRDD is', numRDD.getNumPartitions())

# RDD operations in PySpark
# operations = transformation + actions
# transformation creates RDD, action computes on RDDs
# transformations follow Lazy evaluation, which enables RDDs to be fault tolerant

# map() transformation
# create RDD using SparkContext's parallelize method
RDD = sc.parallelize([1, 2, 3, 4])
# apply map function to each element
RDD_map = RDD.map(lambda x: x * x)

# filter transformation returns a new RDD with elements that pass the condition
RDD = sc.parallelize([1, 2, 3, 4])
RDD_filter = RDD.filter(lambda x: x > 2)

# flatMap transformation returns many values for each element in the original RDD
RDD = sc.parallelize(['Hello world', 'How are you'])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))

# union transformation returns the union of one RDD with another RDD
inputRDD = sc.textFile("logs.txt")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningsRDD = inputRDD.filter(lambda x: "warnings" in x.split())
combinedRDD = errorRDD.union(warningsRDD)

In [None]:
# actions are operations applied to RDD and return a value
# basic RDD actions: collect(), take(N), first(), count()
RDD_map.collect()
RDD_map.take(2)
RDD_map.first()
RDD_flatmap.count()

In [None]:
# work with RDD key/value pairs
# pair RDDs: special data structure
# pair RDDs: key = identifier, value = data
# 2 ways to create a pair RDD: from a list of key-value tuple or from a regular RDD
# get the data into the key/value form

# create a pair RDD from a list of key-value tuple
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)

# create a pair RDD from regular RDDs
my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))

# some transformations for pairRDDs: reduceByKey(), groupByKey(), sortByKey(), join()

# practice reduceByKey() transformation
regularRDD = sc.parallelize(["Messi", 23], ["Ronaldo", 34]
                            ["Neymar", 22], ["Messi", 24])
pairRDD_reducebykey = regularRDD.reduceByKey(lambda x, y: x + y)
pairRDD_reducebykey.collect()

# sorting of data: sortByKey() transformation
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))
pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()

# group values with the same key: groupByKey()
airports = [("US", "JFK"), ("UK", "LHR"), ("FR", "CDG"), ("US", "SFO")]
regularRDD = sc.parallelize(airports)
pairRDD_group = regularRDD.groupByKey().collect()
for cont, air in pairRDD_group:
    print(cont, list(air))
    
# join() transformation to connect two pair RDDs based on their key
RDD1 = sc.parallelize([("Messi", 24), ("Ronaldo", 32)], ("Neymar", 24))
RDD2 = sc.parallelize([("Ronaldo", 40), ("Neymar", 120), ("Messi", 50)])
RDD1.join(RDD2).collect()

# Practice
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5)])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))

# ADVANCED RDD actions
# reduce() action - avoid collect() because of the size
x = [1, 3, 4, 6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
# saveAsTextFile() - each partition is saved separately as a file inside a directory
RDD.saveAsTextFile("FileName")
# coalesce() - save RDD as a single text file
RDD.coalesce(1).saveAsTextFile("FileName")

# RDD actions: countByKey(), collectAsMap()
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
for k, v in rdd.countByKey().items():
    print(k, v)
    
# collectAsMap(): returns k-v pairs as dict
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()

# Practice
# Transform the rdd with countByKey()
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
  print("key", k, "has", v, "counts")

# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split(' '))

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
    print("{} has {} counts". format(word[1], word[0]))

# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))

# PySpark DATAFRAME
# Spark high level API for working with structured data
# PySpark SQL is a Spark library for structured data
# PySpark SQL provides the structure of data and the computation being performed
# PySpark SQL provides a programming abstraction called DataFrames
# SparkSession does for DataFrames what the SparkContext does for RDDs
# SparkSession creates DataFrames, registers DataFrames as tables, executes SQL over tables, cache tables
# DataFrames can be created by spark in two ways: from existing RDDs - createDataFrame(), from various sources using
# SparkSession's read method
# Schema is a structure of data in DataFrame, it helps Spark optimize queries on the data more efficiently

# CREATE a DataFrame from RDD
iphones_RDD = sc.parallelize([("XR", 2018, 5.65, 2.79, 6.24),
                              ("Xs", 2018, 5.94, 2.98, 6.84)])
names = ["Model", "Year", "Height", "Width", "Weight"]
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
type(iphones_df)

# CREATE a DataFrame from a csv file
df_csv = spark.read.csv("filename.csv", header=True, inferSchema=True)
# CREATE a DataFrame from a json file
df_csv = spark.read.json("filename.json", header=True, inferSchema=True)

# Practice
# Create a list of tuples
sample_list = [('Mona',20), ('Jennifer',34), ('John',20), ('Jim',26)]

# Create a RDD from the list
rdd = sc.parallelize(sample_list)

# Create a PySpark DataFrame
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])

# Check the type of names_df
print("The type of names_df is", type(names_df))

# INTERACTING WITH PYSPARK DATAFRAMES
# DataFrames support transformations and actions
# Common DataFrame transformations: select(), filter(), groupby(), orderby(), dropDuplicates(), withColumnRenamed()
# Common DataFrame actions: printSchema(), head(), show(), count(), columns(), describe()

# select and show operations
# pass column names inside select()
df_id_age = test.select('Age')
# show is an action that prints the first 20 rows by default
df_id_age.show(3)
# filter() filters out rows based on a condition, pass the column name and the value of what we want to filter
new_df_age21 = new_df.filter(new_df.Age > 21)
new_df_age21.show(3)

# groupby() groups a variable
test_df_age_group = test_df.groupby('Age')
test_df_age_group.count().show(3)

# orderby() sorts the dataframe based on one or more columns
test_df_age_group.count().orderBy('Age').show(3)

# dropDuplicates() removes the duplicate rows of a DataFrame
test_df_no_dup = test_df.select('User ID', 'Gender', 'Age').dropDuplicates()
test_df_no_dup.count()

# withColumnRenamed() renames a column in the DataFrame
test_df_sex = test_df.withColumnRenamed('Gender', 'Sex')

# printSchema() prints the types of columns in DataFrames
test_df.printSchema()

# columns() prints the names of columns in DataFrame
test_df.columns()

# describe() summarizes the statistics of numerical columns in DataFrame
test_df.describe().show()

# PRACTICE
# Print the first 10 observations 
people_df.show(10)

# Count the number of rows 
print("There are {} rows in the people_df DataFrame.".format(people_df.count()))

# Count the number of columns and their names
print("There are {} columns in the people_df DataFrame and their names are {}".format(len(people_df.columns), people_df.columns))

# Select name, sex and date of birth columns
people_df_sub = people_df.select('name', 'sex', 'date of birth')

# Print the first 10 observations from people_df_sub
people_df_sub.show(10)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.count(), people_df_sub_nodup.count()))

# Filter people_df to select females 
people_df_female = people_df.filter(people_df.sex == "female")

# Filter people_df to select males
people_df_male = people_df.filter(people_df.sex == "male")

# Count the number of rows 
print("There are {} rows in the people_df_female DataFrame and {} rows in the people_df_male DataFrame".format(people_df_female.count(), people_df_male.count()))


# INTERACT WITH PySpark SQL using SQL Query
# PySpark SQL - high level API built on top of Spark Core for structured data
# PySpark can interact with SparkSQL through DataFrame API and SQL Queries
# SparkSession sql() executes SQL query
# sql() takes a SQL statement as an argument and returns the results as DataFrame
# SQL Queries cannot be run directly against DataFrames
# To issue SQL Queries against an existing DataFrame, use createOrReplaceTempView() to build a temporary table
df.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT field1, field 2 FROM table1")
df2.collect()

# SQL query to extract data
test_df.createOrReplaceTempView("test_table")
query = '''SELECT Product_ID FROM test_table'''
test_product_df = spark.sql(query)
test_product_df.show(5)

# PRACTICE
# Create a temporary table "people"
people_df.createOrReplaceTempView("people")

# Construct a query to select the names of the people from the temporary table "people"
query = '''SELECT name FROM people'''

# Assign the result of Spark's query to people_df_names
people_df_names = spark.sql(query)

# Print the top 10 names of the people
people_df_names.show(10)

# Filter the people table to select female sex 
people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')

# Filter the people table DataFrame to select male sex
people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')

# Count the number of rows in both DataFrames
print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))

# DATA VISUALIZATION with PySpark using DataFrames
# 3 methods to plot graphs using PySpark DataFrames
# 1st: pyspark_dist_explore
# 2nd: toPandas()
# 3rd: HandySpark

# pyspark_dist_explore: hist(), distplot(), pandas_histogram()
# draw a histogram
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
test_df_age = test_df.select('Age')
hist(test_df_age, bins=20, color="red")

# convert PySpark DataFrames into Pandas DataFrames
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
test_df_sample_pandas = test_df_sample.toPandas()
test_df_sample_pandas.hist('Age')

# Pandas DataFrames vs PySpark DataFrames
# Pandas = 1 server memory, PySpark = multiple servers
# Pandas = mutable, PySpark = immutable
# Pandas APIs support more operations

# HandySpark method
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
hdf = test_df.toHandy()
hdf.cols['Age'].hist()

# PRACTICE
# Check the column names of names_df
print("The column names of names_df are", names_df.columns)

# Convert to Pandas DataFrame  
df_pandas = names_df.toPandas()

# Create a horizontal bar plot
df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')
plt.show()

# Load the Dataframe
fifa_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check the schema of columns
fifa_df.printSchema()

# Show the first 10 observations
fifa_df.show(10)

# Print the total number of rows
print("There are {} rows in the fifa_df DataFrame".format(fifa_df.count()))

# Create a temporary view of fifa_df
fifa_df.createOrReplaceTempView('fifa_df_table')

# Construct the "query"
query = '''SELECT * FROM fifa_df_table WHERE Nationality == "Germany"'''

# Apply the SQL "query"
fifa_df_germany_age = spark.sql(query)

# Generate basic statistics
fifa_df_germany_age.describe().show()


# PySpark MLlib - a built-in library for scalable machine learning

# Machine Learning - a scientific discipline that explores the construction and study of algorithms that can learn from data
# PySpark MLlib provides tools such as algorithms (collaborative filtering, classification, clustering)
# Scikit-Learn is a very popular and easy to use Python library for machine learning and data mining
# Scikit-Learn only works for small datasets and on a single machine
# PySpark MLlib algorithms are designed for parallel processing on a cluster, support Java, Scala and R
# MLlib provides high level API to build ML pipelines
# Including Classification (Binary and Multiclass) and Regression: Linear SVMs, logistic regression, decision trees, random forests,
# gradient-boosted trees, naive Bayes, linear least squares, Lasso, ridge regression, isotonic regression
# Collaborative filtering: alternating least squares (ALS)
# Clustering: K-means, Gaussian mixture, Bisecting K-means and Streaming K-means
# Three Key Areas (Three C's) of machine learning in PySpark MLlib: Collaborative Filtering, Classification, Clustering

# PySpark MLlib imports
pyspark.mllib.recommendation
from pyspark.mllib import ALS

pyspark.mllib.classification
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

pyspark.mllib.clustering
from pyspark.mllib.clustering import KMeans

# Import the library for ALS
from pyspark.mllib.recommendation import ALS

# Import the library for Logistic Regression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

# Import the library for Kmeans
from pyspark.mllib.clustering import KMeans

# Collaborative Filtering
# is a method of making automatic predictions about the interests of the users by collecting preferences from other users
# 2 ways: 
# user-user collaborative filtering - finds users that are similar to the targeted user 
# item-item collaborative filtering - finds items that are similar to items interested by the targeted user

# Rating class is a wrapper around tuple (user, product, rating)
# in pyspark.mllib.recommendation submodule
# useful for parsing RDDs, creating a tuple of user, product and rating
from pyspark.mllib.recommendation import Rating
r = Rating(user=1, product=2, rating=5.0)

# splitting the data with randomSplit()
# splitting the data to evaluate the predictive modeling

# PRACTICE
# Load the data into RDD
data = sc.textFile(file_path)

# Split the RDD 
ratings = data.map(lambda l: l.split(','))

# Transform the ratings RDD 
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))

# Split the data into training and test
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])

# Create the ALS model on the training data
model = ALS.train(training_data, rank=10, iterations=10)

# Drop the ratings column 
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))

# Predict the model  
predictions = model.predictAll(testdata_no_rating)

# Print the first rows of the RDD
predictions.take(2)

# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))

# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))

# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)

# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))

## Classification

In [None]:
# Classification is a supervised machine learning algorithm for sorting the input data into different categories
# Binary vs Multiclass
# Linear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes, etc
# Logistic Regression to predict a binary response, the output must be 0 or 1. If the probability is > 50%, output is 1.
# Working with Vectors and LabelledPoints
# Vectors: dense and sparse
# Dense Vectors: store all of their entries in an array of floating point numbers
# Sparse Vectors: store only nonzero values and their indices
denseVec = Vectors.dense([1.0, 2.0, 3.0])
sparseVec = Vectors.sparse(4, {1: 1.0, 3: 5.5})
# LabelledPoint is a wrapper for input features and predicted value
positive = LabelledPoint(1.0, [1.0, 0.0, 3.0])
negative = LabelledPoint(0.0, [2.0, 1.0, 1.0])
# HashingTF() computes a term frequency vector of a given size, used to map feature value to indices in the feature vector
from pyspark.mllib.feature import HashingTF
sentence = "hello hello world"
words = sentence.split()
tf = HashingTF(10000)
tf.transform(words)

# Logistic Regression using LogisticRegressionWithLBFGS
# The minimum requirement for LogisticRegressionWithLBFGS is an RDD of LabelledPoint
data = [
        LabelledPoint(0.0, [0.0, 1.0]),
        LabelledPoint(1.0, [1.0, 0.0])
]
RDD = sc.parallelize(data)
lrm = LogisticRegressionLBFGS.train(RDD)
lrm.predict([1.0, 0.0])
lrm.predict([0.0, 1.0])

# PRACTICE

# Load the datasets into RDDs
spam_rdd = sc.textFile(file_path_spam)
non_spam_rdd = sc.textFile(file_path_non_spam)

# Split the email messages into words
spam_words = spam_rdd.flatMap(lambda email: email.split(' '))
non_spam_words = non_spam_rdd.flatMap(lambda email: email.split(' '))

# Print the first element in the split RDD
print("The first element in spam_words is", spam_words.first())
print("The first element in non_spam_words is", non_spam_words.first())

# Create a HashingTf instance with 200 features
tf = HashingTF(numFeatures=200)

# Map each word to one feature
spam_features = tf.transform(spam_words)
non_spam_features = tf.transform(non_spam_words)

# Label the features: 1 for spam, 0 for non-spam
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
non_spam_samples = non_spam_features.map(lambda features:LabeledPoint(0, features))

# Combine the two datasets
samples = spam_samples.join(non_spam_samples)

# Split the data into training and testing
train_samples,test_samples = samples.randomSplit([0.8, 0.2])

# Train the model
model = LogisticRegressionWithLBFGS.train(train_samples)

# Create a prediction label from the test data
predictions = model.predict(test_samples.map(lambda x: x.features))

# Combine original labels with the predicted labels
labels_and_preds = test_samples.map(lambda x: x.label).zip(predictions)

# Check the accuracy of the model on the test data
accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_samples.count())
print("Model accuracy : {:.2f}".format(accuracy))

## Clustering

In [None]:
# Clustering is an unsupervised method to group unlabeled data together
# Grouping similar objects into clusters with no labels
# KMeans, Gaussian Mixture, Power Iteration Clustering (IPC), Bisecting KMeans clustering, streaming KMeans clustering

# KMeans
RDD = sc.textFile("Data.csv"). \
        map(lambda x: x.split(",")).\
        map(lambda x: [float(x[0]), float(x[1])])
RDD.take(5)

from pyspark.mllib.clustering import KMeans
model = KMeans.train(RDD, k=2, maxIterations=10)
model.clusterCenters

# evaluate the model by computing the error function
from math import sqrt
def error(point):
    center = model.center[model.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSE = RDD.map(lambda point: error(point).reduce(lambda x, y: x + y))
print("Within Set Sum of Square Error = " + str(WSSSE))

# Visualizing KMeans
data_df = spark.createDataFrame(RDD, schema=["col1", "col2"])
data_df_pandas = spark.toPandas()

cluster_center_pandas = pd.DataFrame(model.clusterCenters, columns=["col1", "col2"])
cluster_center_pandas.head()

plt.scatter(data_df_pandas["col1"], data_df_pandas["col2"])
plt.scatter(cluster_center_pandas["col1"], cluster_center_pandas["col2"], color="red", marker="x")

# PRACTICE
# Load the dataset into a RDD
clusterRDD = sc.textFile(file_path)

# Split the RDD based on tab
rdd_split = clusterRDD.map(lambda x: x.split('\t'))

# Transform the split RDD by creating a list of integers
rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])

# Count the number of rows in RDD 
print("There are {} rows in the rdd_split_int dataset".format(rdd_split_int.count()))

# Train the model with clusters from 13 to 16 and compute WSSSE 
for clst in range(13, 17):
    model = KMeans.train(rdd_split_int, clst, seed=1)
    WSSSE = rdd_split_int.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("The cluster {} has Within Set Sum of Squared Error {}".format(16, WSSSE))

# Train the model again with the best k 
model = KMeans.train(rdd_split_int, k=15, seed=1)

# Get cluster centers
cluster_centers = model.clusterCenters

# Convert rdd_split_int RDD into Spark DataFrame
rdd_split_int_df = spark.createDataFrame(rdd_split_int, schema=["col1", "col2"])

# Convert Spark DataFrame into Pandas DataFrame
rdd_split_int_df_pandas = rdd_split_int_df.toPandas()

# Convert "cluster_centers" that you generated earlier into Pandas DataFrame
cluster_centers_pandas = pd.DataFrame(rdd_split_int_df_pandas, columns=["col1", "col2"])

# Create an overlaid scatter plot
plt.scatter(rdd_split_int_df_pandas["col1"], rdd_split_int_df_pandas["col2"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], color="red", marker="x")
plt.show()