# Spark notebook examples

In [None]:
# Show the Spark Context
sc

### This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate

In [None]:
import sys
from random import random
from operator import add

NUM_SAMPLES = 100000000

def inside(p):
    x, y = random(), random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

# Spark RDD API example

In [None]:
# Parallelize a data set converting from an Array to an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])

In [None]:
# Count the number of rows in the RDD
print(rdd.count())

In [None]:
# View some rows
print(rdd.take(10))

In [None]:
# Sort descending
descendingRdd = rdd.sortBy(lambda x: x, ascending = False)

# View some rows
print(descendingRdd.take(10))

In [None]:
# Filter the RDD
filteredRdd = rdd.filter(lambda x: x < 5)

# View some rows
print(filteredRdd.take(10))

In [None]:
# Map the RDD
rdd2 = rdd.map(lambda x: (x, x * 2))

# View some rows
print(rdd2.take(10))

In [None]:
# Reduce the RDD by adding up all of the numbers
result = rdd.reduce(lambda a, b: a + b)

print(result)

In [None]:
# Load a Text file from HDFS / MINIO
#textFile = sc.textFile("hdfs://...")

In [None]:
# Save an RDD to HDFS
#textFile.saveAsTextFile("hdfs://...")

In [None]:
# Parallelize a data set converting from an Array to an RDD
rdd = sc.parallelize(["aaa bbb ccc", "aaa bbb", "bbb ccc", "abc"])

# WordCount
results = rdd.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

# Get the Results
results.collect()

# Spark SQL adn DataFrame API examples

In [None]:
# Show the Spark Session
spark

In [None]:
#! pip install pandas
data = [
    (1, 'a'), 
    (2, 'b'), 
    (3, 'c'), 
    (4, 'd'), 
    (5, 'e'), 
    (6, 'a'), 
    (7, 'b'), 
    (8, 'c'), 
    (9, 'd'), 
    (10, 'e')
]

# Convert a local data set into a DataFrame
df = spark.createDataFrame(data, ['numbers', 'letters'])

# Convert to a Pandas DataFrame for easy display
df.toPandas()

In [None]:
# Register the DataFrame as a table
df.registerTempTable("mytable")

# Perform a simple select from the table
results = spark.sql("select * from mytable")

# Convert the results to a Pandas DataFrame for easy viewing
results.toPandas()

In [None]:
# Perform a query with a where clause and order by
results = spark.sql("select * from mytable where numbers < 8 order by numbers desc")

# Convert the results to a Pandas DataFrame for easy viewing
results.toPandas()

In [None]:
# Perform a more complex query on the table
results = spark.sql("select letters, count(*) as count, avg(numbers) as avg, sum(numbers) as sum from mytable group by letters")

# Convert the results to a Pandas DataFrame for easy viewing
results.toPandas()

In [None]:
# Count the number of rows in the DataFrame
print(df.count())
# View some rows
print(df.take(3))

In [None]:
# Sort descending
descendingDf = df.orderBy(df.numbers.desc())

# View some rows
descendingDf.toPandas()

In [None]:
# Filter the DataFrame
filtered = df.where(df.numbers < 5)

# Convert to Pandas DataFrame for easy viewing
filtered.toPandas()

In [None]:
# import some more functions
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import avg
from pyspark.sql.functions import sum

# Perform aggregations on the DataFrame
agg = df.agg(
    avg(df.numbers).alias("avg_numbers"), 
    sum(df.numbers).alias("sum_numbers"),
    countDistinct(df.numbers).alias("distinct_numbers"), 
    countDistinct(df.letters).alias('distinct_letters')
)

# Convert the results to Pandas DataFrame
agg.toPandas()

In [None]:
# View some summary statistics
df.describe().show()

# Setup Spark Context by hand

In [None]:
# stop the current context
sc.stop()

In [None]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("k8s://https://kubernetes:443")
         .setAppName("MyApp")
         .set("spark.executor.memory", "1g")
         .set("spark.executor.instances", "2")
         .set("spark.kubernetes.container.image", "dodasts/spark:v3.0.0")
# configure S3 access         
         .set("spark.hadoop.fs.s3a.endpoint", "http://90.147.174.115:34900")
         .set("spark.hadoop.fs.s3a.path.style.access","true")
         .set("spark.hadoop.fs.s3a.access.key", "CCR")
         .set("spark.hadoop.fs.s3a.secret.key", "tutorialCCR")
         .set("spark.hadoop.fs.s3a.fast.upload", "true")
         .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .set("spark.hadoop.fs.s3a.committer.name", "directory")
       )

sc = SparkContext(conf = conf)
sc

# Accessing files from S3

In [None]:
# create a RDD from a text file         
text_rdd = sc.textFile("s3a://democcr/CCR-Tutorial-Days.txt")
print(text_rdd.collect())

In [None]:
# word count example         
counts = text_rdd.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b) \
             .map(lambda x: (x[1],x[0])) \
             .sortByKey(ascending = False)
#counts.saveAsTextFile("s3a://democcr/out.txt")
counts.take(10)


In [None]:
# create a DataFrame from a CSV file         
from pyspark.sql import SparkSession
spark = SparkSession(sc)
csv_df = spark.read.csv("s3a://democcr/Arrivi-e-presenze-turistiche-serie-storica-2003-2016.csv")
csv_df.printSchema()
csv_df.toPandas()

In [None]:
# read the CSV with some options         
csv_df = spark.read.options(delimiter=';',header='True',inferSchema='True').csv("s3a://democcr/Arrivi-e-presenze-turistiche-serie-storica-2003-2016.csv")
csv_df.printSchema()
csv_df.toPandas()

# Cluster mode

In [None]:
# stop the context (to free up resources)
sc.stop()

In [None]:
! ls -l /usr/local/spark/bin/spark-submit

In [None]:
! /usr/local/spark/bin/spark-submit --master k8s://kubernetes:443 --deploy-mode cluster --name spark-pi \
        -c spark.driver.memory=2g -c spark.executor.instances=2 \
        -c spark.kubernetes.container.image=dodasts/spark:v3.0.0 \
        local:///usr/local/spark/examples/src/main/python/pi.py 5000