# Our first Spark script

In [3]:
# necessary import
import findspark
findspark.init()

In [4]:
import pyspark

if not 'sc' in globals():
    conf = pyspark.SparkConf()
    conf.set('spark.local.dir', 'c:/data')
    sc = pyspark.SparkContext(conf=conf)

lines = sc.textFile('data/iris.data')
linelengths = lines.map(lambda s: len(s))
totallength = linelengths.reduce(lambda a,b: a+b)
print(totallength)

4400


# Spark word count

In [5]:
import pyspark

if not 'sc' in globals():
    sc = pyspark.SparkContext()
    
lines = sc.textFile('data/iris.data')
counts = lines.flatMap(lambda line: line.split(",")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
    
for x in counts.collect():
    print(x)

('0.2', 28)
('Iris-setosa', 50)
('1.3', 20)
('3.1', 12)
('1.5', 26)
('5.0', 14)
('3.6', 4)
('5.4', 8)
('3.9', 5)
('1.7', 6)
('0.3', 7)
('2.9', 10)
('0.1', 6)
('4.8', 9)
('1.0', 8)
('0.5', 1)
('1.9', 7)
('4.2', 5)
('2.3', 12)
('0.6', 1)
('7.0', 1)
('6.4', 8)
('6.5', 5)
('2.4', 6)
('6.6', 3)
('2.7', 9)
('2.0', 7)
('5.9', 5)
('6.0', 8)
('2.2', 6)
('6.7', 10)
('6.2', 4)
('6.8', 3)
('2.6', 5)
('Iris-virginica', 50)
('7.1', 1)
('7.6', 1)
('7.3', 1)
('7.2', 3)
('7.7', 4)
('5.1', 17)
('3.5', 8)
('1.4', 20)
('4.9', 11)
('3.0', 27)
('4.7', 7)
('3.2', 13)
('4.6', 7)
('0.4', 7)
('3.4', 12)
('4.4', 8)
('3.7', 4)
('1.6', 11)
('4.3', 3)
('1.1', 4)
('5.8', 10)
('4.0', 6)
('1.2', 7)
('5.7', 11)
('3.8', 7)
('3.3', 8)
('5.2', 6)
('4.1', 4)
('5.5', 10)
('4.5', 9)
('5.3', 3)
('Iris-versicolor', 50)
('6.9', 5)
('2.8', 14)
('6.3', 10)
('6.1', 9)
('5.6', 12)
('2.5', 11)
('1.8', 12)
('2.1', 6)
('7.4', 1)
('7.9', 1)


# Sorted word count

In [6]:
counts = lines.flatMap(lambda line: line.split(",")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortByKey()
    
for x in counts.collect():
    print(x)

('0.1', 6)
('0.2', 28)
('0.3', 7)
('0.4', 7)
('0.5', 1)
('0.6', 1)
('1.0', 8)
('1.1', 4)
('1.2', 7)
('1.3', 20)
('1.4', 20)
('1.5', 26)
('1.6', 11)
('1.7', 6)
('1.8', 12)
('1.9', 7)
('2.0', 7)
('2.1', 6)
('2.2', 6)
('2.3', 12)
('2.4', 6)
('2.5', 11)
('2.6', 5)
('2.7', 9)
('2.8', 14)
('2.9', 10)
('3.0', 27)
('3.1', 12)
('3.2', 13)
('3.3', 8)
('3.4', 12)
('3.5', 8)
('3.6', 4)
('3.7', 4)
('3.8', 7)
('3.9', 5)
('4.0', 6)
('4.1', 4)
('4.2', 5)
('4.3', 3)
('4.4', 8)
('4.5', 9)
('4.6', 7)
('4.7', 7)
('4.8', 9)
('4.9', 11)
('5.0', 14)
('5.1', 17)
('5.2', 6)
('5.3', 3)
('5.4', 8)
('5.5', 10)
('5.6', 12)
('5.7', 11)
('5.8', 10)
('5.9', 5)
('6.0', 8)
('6.1', 9)
('6.2', 4)
('6.3', 10)
('6.4', 8)
('6.5', 5)
('6.6', 3)
('6.7', 10)
('6.8', 3)
('6.9', 5)
('7.0', 1)
('7.1', 1)
('7.2', 3)
('7.3', 1)
('7.4', 1)
('7.6', 1)
('7.7', 4)
('7.9', 1)
('Iris-setosa', 50)
('Iris-versicolor', 50)
('Iris-virginica', 50)


# Estimate Pi

In [7]:
import pyspark
import random

NUM_SAMPLES = 1000

def sample(p):
    x,y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(range(0, NUM_SAMPLES)) \
    .map(sample) \
    .reduce(lambda a, b: a + b)
    
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.056000


# Log file examination

In [8]:
textFile = sc.textFile('data/access_log')

print(textFile.count(), 'access records')

gets = textFile.filter(lambda line: 'GET' in line)
print(gets.count(), 'GET')

posts = textFile.filter(lambda line: 'POST' in line)
print(posts.count(), 'POST')

other = textFile.subtract(gets).subtract(posts)
print(other.count(), 'Others')

1546 access records
1525 GET
14 POST
7 Others


# Spark primes

In [9]:
def is_it_prime(number):
    # make sure n is a positive number
    number = abs(int(number))
    
    # simple test
    if number < 2:
        return False
    
    # 2 is prime
    if number == 2:
        return True
    
    # other even numbers aren't
    if not number & 1:
        return False
    
    # check whether number is divisible into it's square root
    for x in range(3, int(number**0.5)+1, 2):
        if number % x == 0:
            return False
    
    #if we get this far we are good
    return True

# create a set of numbers to 100,000
numbers = sc.parallelize(range(100000))

# count out the number of primes we found
print(numbers.filter(is_it_prime).count())

9592


# Spark text file analysis

In [5]:
sentences = sc.textFile('data/2600raid.txt') \
    .glom() \
    .map(lambda x: " ".join(x)) \
    .flatMap(lambda x: x.split("."))

print(sentences.count(),"sentences")

bigrams = sentences.map(lambda x:x.split()) \
    .flatMap(lambda x: [((x[i],x[i+1]),1) for i in range(0,len(x)-1)])
    
print(bigrams.count(),"bigrams")

frequent_bigrams = bigrams.reduceByKey(lambda x,y:x+y) \
    .map(lambda x:(x[1],x[0])) \
    .sortByKey(False)

frequent_bigrams.take(10)

220 sentences
3463 bigrams


[(36, ('of', 'the')),
 (15, ('the', 'mall')),
 (12, ('to', 'the')),
 (12, ('on', 'the')),
 (12, ('At', 'this')),
 (11, ('the', 'guards')),
 (11, ('at', 'the')),
 (11, ('in', 'the')),
 (9, ('a', 'few')),
 (9, ('and', 'the'))]

# Spark - evaluating history data

In [13]:
import pyspark
import csv
import operator
import itertools
import collections

years = {}
occupations = {}
guests = {}

#The file header contains these column descriptors
#YEAR,GoogleKnowlege_Occupation,Show,Group,Raw_Guest_List
with open('data/daily_show_guests.csv') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        year = row['YEAR']
        if year in years:
            years[year] = years[year] + 1
        else:
            years[year] = 1
            
        occupation = row['GoogleKnowlege_Occupation']
        if occupation in occupations:
            occupations[occupation] = occupations[occupation] + 1
        else:
            occupations[occupation] = 1
            
        guest = row['Raw_Guest_List']
        if guest in guests:
            guests[guest] = guests[guest] + 1
        else:
            guests[guest] = 1
            
syears = sorted(years.items(), key=operator.itemgetter(1),reverse=True)
soccupations = sorted(occupations.items(),key=operator.itemgetter(1), reverse=True)
sguests = sorted(guests.items(), key=operator.itemgetter(1),reverse=True)

print(syears[:5])
print(soccupations[:5])
print(sguests[:5])

[('2000', 169), ('1999', 166), ('2003', 166), ('2013', 166), ('2010', 165)]
[('actor', 596), ('actress', 271), ('journalist', 180), ('author', 102), ('Journalist', 72)]
[('Fareed Zakaria', 19), ('Denis Leary', 17), ('Brian Williams', 16), ('Paul Rudd', 13), ('Ricky Gervais', 13)]
