In [None]:
import json
import sys
import requests
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import lit
import socket
import time


In [None]:
spark = SparkSession.builder \
    .appName("WordCount_Jupyter") \
    .master("spark://spark-cluster-master-svc.spark-cluster.svc.cluster.local:7077") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026") \
    .config("spark.executorEnv.JUPYTER_HOST", socket.gethostbyname(socket.gethostname())) \
    .config("spark.driver.host", socket.gethostbyname(socket.gethostname())) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio.svc.cluster.local:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "user") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .enableHiveSupport() \
    .getOrCreate()

# Initialize the spark context
sc = spark.sparkContext

# From know on we can open port 4040 to see the spark web ui including jobs, DAG, stages, executors etc

In [None]:
sc2 = sc._jsc.sc()
number_of_workers = len([executor.host() for executor in
                sc2.statusTracker().getExecutorInfos()]) - 1 

In [None]:
number_of_workers

In [None]:
# I want to measure execution time
start = time.time()

# Define the path to access the input data
input_dir="s3a://demo/input/*" 
# Initialize the list of input files
input_files = sc.textFile(input_dir)

# Per file, create a list of all words (divide everythime a space is found)
words = input_files.flatMap(lambda line: line.split(" "))
# Group duplicated words and count occurrences
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
# Sort words by the number of occurrences
sorted  = word_counts.sortBy(lambda pair: pair[1], ascending=True)
# Join from all workers
final = sorted.collect()
# Iterate and print
for (word, count) in final:
    print("%s: %i" % (word, count))

# Output into a single file on the s3 bucket
sorted.coalesce(1).saveAsTextFile("s3a://demo/output/wordcount_jupyter")

# Print elapsed time during execution
end = time.time()
print(end - start)


In [None]:
# Finish app execution
sc.stop()

In [None]:
sorted.coalesce(1).saveAsTextFile("s3a://demo/output/wordcount_jupyter")