In [0]:
import matplotlib as mpl
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [60]:
# import findspark
# findspark.init()
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

ValueError: ignored

In [0]:
sc

In [0]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x : x**2).sum()

# CREATE A RDD

In [0]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print(type(wordsRDD))

In [0]:
?sc.parallelize()

In [0]:
sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

In [0]:
sc.parallelize(range(0, 6, 2), 5).glom().collect()

In [0]:
wordsRDD.collect()

# OPERATIONS ON RDD

**RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).**

**Word Examples**

In [0]:
def makePlural(word):
  return word + 's'

print(makePlural('cat'))

Transform one RDD into another

In [0]:
pluralRDD = wordsRDD.map(makePlural)
print(pluralRDD.first())
print(pluralRDD.take(2))

In [0]:
pluralRDD.take(1)

In [0]:
pluralRDD.collect()

**Key Value Pairs**

In [0]:
wordPairs = wordsRDD.map(lambda w : (w, 1))
print(wordPairs.collect())

# WORD COUNT

In [0]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD.map(lambda w : (w, 1)).reduceByKey(lambda x,y : x+y).collect())
print(wordCountsCollected)

In [0]:
print(wordsRDD.map(lambda w : (w, 1)).reduceByKey(lambda x,y : x+y).toDebugString())

# Using Cache

In [0]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print(wordsRDD)
wordsRDD.count()

In [0]:
wordsRDD.count()

In [0]:
wordsRDD.cache()

In [0]:
wordsRDD.count()

In [0]:
wordsRDD.count()

**Where is this useful: it is when you have branching parts or loops, so that you dont do things again and again. Spark, being "lazy" will rerun the chain again. So cache or persist serves as a checkpoint, breaking the RDD chain or the lineage.**

In [0]:
birdsList = ['heron', 'owl']
animList = wordsList + birdsList

animaldict = {}
for e in wordsList:
  animaldict[e] = 'mammal'
for e in birdsList:
  animaldict[e] = 'bird'
  
animaldict

In [0]:
animsrdd = sc.parallelize(animList, 4)
animsrdd.cache()

mammalcount = animsrdd.filter(lambda w : animaldict[w] == 'mammal').count()
birdcount = animsrdd.filter(lambda w : animaldict[w] == 'bird').count()

print(mammalcount, birdcount)

In [0]:
stopwords = [e.strip() for e in open('english.stop.txt').readlines()]

In [0]:
juliusrdd = sc.textFile('juliuscaesar.txt')

In [0]:
juliusrdd.flatMap(lambda line : line.split()).count()

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).collect())

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).take(20))

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).collect())

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).take(20))

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).collect())

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).take(20))

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).reduceByKey(lambda a,b : a + b).collect())

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).reduceByKey(lambda a,b : a + b).take(20))

In [0]:
(juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).reduceByKey(lambda a,b : a + b).takeOrdered(20, lambda x : -x[1]))

In [0]:
captions, counts = zip(*juliusrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).reduceByKey(lambda a,b : a + b).takeOrdered(20, lambda x : -x[1]))

pos = np.arange(len(counts))
plt.bar(pos, counts)
plt.xticks(pos+0.4, captions, rotation=90)

In [0]:
from google.colab import files
upload = files.upload()

In [0]:
shakesrdd = sc.textFile('*.txt', minPartitions = 4)

In [0]:
shakesrdd.collect()

In [0]:
shakesrdd.take(10)

In [0]:
(shakesrdd.flatMap(lambda line : line.split()).map(lambda word : word.strip().lower()).filter(lambda word : word not in stopwords).map(lambda word : (word, 1)).reduceByKey(lambda a,b : a+b).sortByKey(0,1).take(30))

In [0]:
from google.colab import files
upload = files.upload()

In [0]:
df = pd.read_csv('01_heights_weights_genders.csv')
df.head()

In [0]:
from pyspark.sql import SQLContext
sqlsc = SQLContext(sc)
sparkdf = sqlsc.createDataFrame(df)

In [0]:
sparkdf

In [0]:
sparkdf.show(10)

In [0]:
type(sparkdf.Gender)

In [0]:
temp = sparkdf.rdd.map(lambda r : r.Gender)
print(type(temp))
temp.take(10)

# Machine Learning

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

data = sparkdf.rdd.map(lambda row : LabeledPoint(row.Gender == 'Male', [row.Height, row.Weight]))
data.take(5)

In [0]:
data2 = sparkdf.rdd.map(lambda row : LabeledPoint(row[0] == 'Male', row[1:]))
data2.take(5)[0].label, data2.take(5)[1].features

In [0]:
train, test = data.randomSplit([0.7, 0.3])
train.cache()
test.cache()

In [0]:
type(train)

In [0]:
type(test)

In [0]:
model = LogisticRegressionWithLBFGS.train(train)

In [0]:
model.weights

In [0]:
results = test.map(lambda lp : (lp.label, float(model.predict(lp.features))))

In [0]:
results.take(10)

In [0]:
type(results)

In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(results)

In [0]:
print(type(metrics))
metrics.areaUnderROC

In [0]:
type(model)

In [0]:
!rm -rf mylogistic.model

In [0]:
sc.stop()