In [1]:
from pyspark import SparkContext
sc = SparkContext('local[*]', 'pyspark tutorial')
from time import sleep, time

# A decorator: timer 
https://codereview.stackexchange.com/questions/169870/decorator-to-measure-execution-time-of-a-function

In [43]:
from time import time
def timer(f):
    def wrapper(*args, **kwargs):  
        # 1. what are *args and **kwargs  ?  
        start = time()
        result = f(*args, **kwargs)
        end = time()
        print 'The function {0} took {1} seconds to return.'.format(f.__name__, end-start)
        # 2. what is .format ?
        # 3. what is f.__name__ ?
        return result
    return wrapper
# issue: print command is executed on every recursive call of f (ideally it should be executed only once at the end)    

1. https://www.programiz.com/python-programming/args-and-kwargs
   - args (non keyword arguments) is a list 
   - kwargs (keyword arguments) is a dictionary 
2. https://pyformat.info/
3. https://stackoverflow.com/questions/251464/how-to-get-a-function-name-as-a-string-in-python

# Evaluating a function on multiple inputs in parallel

In [34]:
_limit = 8
my_list = range(_limit)

def fun(x):
    sleep(5)
    return x*x

@timer
def evaluate_parallel(my_list):
    my_rdd = sc.parallelize(my_list)
    my_rdd_transformed = my_rdd.map(lambda x: fun(x)).collect()
    return list(my_rdd_transformed)

In [33]:
evaluate_parallel(my_list)

the function evaluate_parallel took 10.0906891823 time


[0, 1, 4, 9, 16, 25, 36, 49]

# Apply a filter on a parallel collection

In [45]:
from random import random
_num_samples = 1000

def is_inside(p):
    sleep(0.01)
    x, y = random(), random()
    return x*x + y*y < 1

@timer
def get_pi():
    count = sc.parallelize(xrange(0, _num_samples)).filter(is_inside).count() # 1. what is xrange ?
    print "Pi is roughly %f" % (4.0 * count / _num_samples)

1. range vs xrange
    - https://www.geeksforgeeks.org/range-vs-xrange-python/
    - https://stackoverflow.com/questions/94935/what-is-the-difference-between-range-and-xrange-functions-in-python-2-x

In [46]:
get_pi()

Pi is roughly 3.120000
The function get_pi took 2.86020493507 seconds to return.


# The pagerank algorithm 
- https://apache.googlesource.com/spark/+/master/examples/src/main/python/pagerank.py
- https://developers.soundcloud.com/blog/pagerank-in-spark
- https://spark.apache.org/docs/latest/quick-start.html
- https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html

https://www.datacamp.com/community/tutorials/networkx-python-graph-tutorial

In [47]:
my_rdd = sc.parallelize(my_list)

In [82]:
lines = sc.textFile('graph.txt')
# 1. what is the difference between sc.parallelize() and sc.textFile() ?

1. sc.textFile creates a parallel collection similar to parallelize()
   - https://stackoverflow.com/questions/44860973/what-are-the-differences-between-sc-parallelize-and-sc-textfile
   - https://www.quora.com/Is-my-understanding-to-parallel-operations-in-spark-correct
   - https://www.learningjournal.guru/article/apache-spark/apache-spark-parallel-processing/

In [73]:
def get_pair(line):
    line = line.strip()
    return line.split()[0], line.split()[1]

links = lines.map(lambda line: get_pair(line)).distinct().groupByKey().cache()
links.collect()

[(u'1', <pyspark.resultiterable.ResultIterable at 0x1163356d0>),
 (u'3', <pyspark.resultiterable.ResultIterable at 0x116337f50>),
 (u'2', <pyspark.resultiterable.ResultIterable at 0x116362050>),
 (u'4', <pyspark.resultiterable.ResultIterable at 0x1163620d0>)]

In [74]:
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
ranks.collect()

[(u'1', 1.0), (u'3', 1.0), (u'2', 1.0), (u'4', 1.0)]

In [85]:
contribs = links.join(ranks)
contribs.collect()

[(u'2',
  (<pyspark.resultiterable.ResultIterable at 0x116352dd0>,
   0.23153811726092793)),
 (u'4',
  (<pyspark.resultiterable.ResultIterable at 0x116352fd0>,
   0.3299418170968223)),
 (u'1',
  (<pyspark.resultiterable.ResultIterable at 0x116335c10>,
   0.38370878711024914)),
 (u'3',
  (<pyspark.resultiterable.ResultIterable at 0x116335710>,
   0.3299418170968223))]

In [86]:
_num_iterations = 10

from operator import add

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

for iteration in range(_num_iterations):
        # Calculates URL contributions to the rank of other URLs.  
        contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
        # 1. what is flatMap ?
        
        # Re-calculates URL ranks based on neighbor contributions.
        ranks = contribs.reduceByKey(add).mapValues(lambda rank: 0.85 * rank + 0.15)

        
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.sortBy(lambda x: -x[1]).collect():
        print("%s has rank: %s." % (link, rank))
    
# 2. How to sort by ranks ?    
# 3. How to sort by values in descending order ?

1 has rank: 0.38370878711.
10 has rank: 0.371763389527.
3 has rank: 0.329941817097.
4 has rank: 0.329941817097.
6 has rank: 0.243483514844.
5 has rank: 0.243483514844.
2 has rank: 0.231538117261.


1. http://www.brunton-spall.co.uk/post/2011/12/02/map-map-and-flatmap-in-scala/
2. https://stackoverflow.com/questions/33706408/how-to-sort-by-value-efficiently-in-pyspark
3. https://stackoverflow.com/questions/30787635/takeordered-descending-pyspark

# k-means clustering

https://www.cs.helsinki.fi/u/lagerspe/courses/bigdata-spark-algorithms.pdf

http://www.archer.ac.uk/training/course-material/2018/02/data-an-belfast/Spark%20k-means%20walkthrough.pdf

In [133]:
import numpy as np

def generate_clusters(k):
    clusters = {}
    mu = 0
    sigma = 1
    
    for i in range(k):
        mu = mu + 10*i
        clusters[i] = list(np.random.normal(mu, sigma, 100))
        
    samples = []    
    for i in range(k):
        samples = samples + clusters[i]
        
    return samples    

k = 5
samples = generate_clusters(k)

In [134]:
samples

[-1.6173757147586225,
 -0.7557763386263926,
 -0.00044457623099997964,
 -0.19685356835344597,
 -0.59152843735581795,
 -0.23194726081167855,
 1.5346842579959097,
 0.99921301821709874,
 -1.460374794252072,
 -0.17242380796102594,
 -1.3197190352499628,
 -0.54380080815495102,
 -2.7108645030938363,
 -0.32668037044326398,
 0.46201477552358344,
 0.95269887660313335,
 -0.21154510491855319,
 0.87629350324931266,
 -1.3202687763706058,
 -0.76397945224406327,
 -1.214955380557398,
 -0.61525724903848722,
 1.1203420773735047,
 -1.1267339245006667,
 -0.046861840903100367,
 0.98864295214249864,
 -0.036395432226228874,
 -2.1201405390910013,
 -1.6138331439942988,
 0.047839493834698876,
 -1.4497241444949318,
 0.84671447264778155,
 0.18026724059317278,
 -0.82069451384642167,
 1.2570866151155822,
 0.27528515047902624,
 0.9345571030203772,
 1.1964562254871214,
 0.19618364306189637,
 0.047795637662085631,
 0.55296426528265841,
 -1.0277284655320595,
 1.4001949605819248,
 1.8154324493710707,
 -0.00924709253317161

In [137]:
_num_iterations = 1000
_infinity = 10000000

def distance(a, b):
    return abs(a-b)

def get_closest_center(point, centers):
    closest_distance = _infinity
    closest_center = -1
    
    for center in centers:
        current_distance = distance(point, centers[center]) 
        if current_distance < closest_distance:
            closest_distance = current_distance
            closest_center = center
            
    return closest_center        
        
def update_clusters(centers, clusters, samples):
    clusters = {}
    for i in centers.keys():
        clusters[i] = []
        
    for point in samples:
        center = get_closest_center(point, centers)
        clusters[center].append(point)
        
    return clusters    

def update_centers(centers, clusters, samples):
    centers = {}
    for i in clusters.keys():
        if len(clusters[i]) != 0:
            centers[i] = sum(clusters[i]) / len(clusters[i])
        else:
            centers[i] = np.random.uniform(min(samples), max(samples))
            
    return centers        
        

def k_means_sequential(samples, k):
    sample_min = min(samples)
    sample_max = max(samples)
    
    centers = {}
    clusters = {}
    
    for i in range(k):
        centers[i] = np.random.uniform(sample_min, sample_max)
        clusters[i] = []
        
    
    for i in range(_num_iterations):
        centers = update_centers(centers, clusters, samples)
        clusters = update_clusters(centers, clusters, samples)
        
    return centers    
        


In [136]:
k_means_sequential(samples, k)

{0: 29.885393433603483,
 1: 59.982244921237537,
 2: 100.05740106655657,
 3: -0.041622013524901046,
 4: 9.8909752168588287}

# Self-tuning spectral clustering
https://medium.com/@ArmandGrillet/parallel-self-tuning-spectral-clustering-on-apache-spark-3d95424b80f0