<img src="sztaki_logo.jpg" height="112" width="400" align="left"><br>
<br>

# Test Spark Cluster

### Import python libraries

In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
SparkMasterIP="xxxSPARKMASTERIPxxx"

### Start Spark Application

In [3]:
# Start Spark local mode
# sc = SparkContext(appName="test", master="local")

# Start Spark cluster mode

sc = SparkContext(appName="test", master="spark://"+SparkMasterIP+":7077")

In [4]:
sc

In [5]:
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession\
    .builder\
    .master("spark://"+SparkMasterIP+":7077")\
    .appName("PythonPi")\
    .getOrCreate()

In [7]:
spark

In [8]:
# Set a partition number
partitions = 10

In [9]:
n = 100000 * partitions

In [10]:
def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

In [11]:
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

Pi is roughly 3.140460


### Ohter python libraries

In [12]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

### Create an RDD and fill in a series of data

In [13]:
rdd = sc.parallelize(\
    [Row(name='Steve', age=40, id=1),\
     Row(name='Lui', age=10, id=2),\
     Row(name='Mike', age=99, id=3)\
    ]\
    , numSlices=3
)
rdd.collect()

[Row(name='Steve', age=40, id=1),
 Row(name='Lui', age=10, id=2),
 Row(name='Mike', age=99, id=3)]

### Create a DataFrame from RDD

In [14]:
dataFrame = rdd.toDF()

### Data in RDD

In [15]:
rdd.toDF().show()

+-----+---+---+
| name|age| id|
+-----+---+---+
|Steve| 40|  1|
|  Lui| 10|  2|
| Mike| 99|  3|
+-----+---+---+



### Data in DataFrame

In [16]:
dataFrame.show()

+-----+---+---+
| name|age| id|
+-----+---+---+
|Steve| 40|  1|
|  Lui| 10|  2|
| Mike| 99|  3|
+-----+---+---+



### Get the number of Partitions

In [17]:
dataFrame.rdd.getNumPartitions()

# must be 3

3

### Stop Spark Application

In [18]:
sc.stop()
spark.stop()

# Test HDFS

In [19]:
!wget https://raw.githubusercontent.com/occopus/docs/master/sphinx/source/tutorial-bigdata-ai.rst -O /home/sparkuser/text.txt

--2021-02-17 19:13:25--  https://raw.githubusercontent.com/occopus/docs/master/sphinx/source/tutorial-bigdata-ai.rst
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47380 (46K) [text/plain]
Saving to: ‘/home/sparkuser/text.txt’


2021-02-17 19:13:25 (10.6 MB/s) - ‘/home/sparkuser/text.txt’ saved [47380/47380]



In [20]:
!/home/sparkuser/hadoop/bin/hdfs dfs -put /home/sparkuser/text.txt /

In [21]:
sc = SparkContext(appName="test", master="spark://"+SparkMasterIP+":7077")
distFile = sc.textFile("hdfs://"+SparkMasterIP+":9000/text.txt")

In [22]:
nonempty_lines = distFile.filter(lambda x: len(x) > 0)
print('Nonempty lines', nonempty_lines.count())

Nonempty lines 484


In [23]:
words = nonempty_lines.flatMap(lambda x: x.split(' '))

In [24]:
wordcounts = words.map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x, y: x+y) \
                  .map(lambda x: (x[1], x[0])).sortByKey(False)
print('Top 100 words:')
print(wordcounts.take(100))

Top 100 words:


In [25]:
sc.stop()
spark.stop()