# Spark

### Introduction:

This time you will create the data.



### Step 1. Import the necessary libraries

In [1]:
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col , when , lit , udf
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


In [2]:
spark_sql = SparkSession.builder \
    .master('local[*]') \
    .appName('learn-spark') \
    .getOrCreate()
spark = spark_sql.sparkContext

In [7]:
rdd = spark.parallelize([4,3,2,1,7,8],4) #create RDD
print(type(rdd))
rdd.collect()

<class 'pyspark.rdd.RDD'>


[4, 3, 2, 1, 7, 8]

In [5]:
print(rdd.getNumPartitions()) # get number partitions
rdd.glom().collect()

4


[[4], [3, 2], [1], [7, 8]]

In [6]:
def test1(x,y):
    print(x , y)
    print(x + y)
    return x+y
print(rdd.reduce(test1))

4 5
9
9 1
10
10 15
25
25


In [12]:
rddResult = rdd.filter(lambda x : x > 3)
rddResult.collect()

[4, 7, 8]

In [13]:
# rdd.first()
print(rdd.collect())
rdd.take(3)

[4, 3, 2, 1, 7, 8]


[4, 3, 2]

In [14]:
data = spark.textFile("data.txt") # read file txt
print(type(data))
data.collect()

<class 'pyspark.rdd.RDD'>


['1', '2', '3', '4', '5', '6']

In [15]:
data.count()

6

In [17]:

sort = rddResult.sortBy(lambda x : x).collect()
sort

[4, 7, 8]

In [18]:
rdd.randomSplit([3,2])[0].collect()

[3, 2, 1, 7]

In [19]:
rdd.max()

8

In [20]:
rdd.min()

1

In [21]:
rdd.mean()

4.166666666666667

In [22]:
m = spark.parallelize([3,4,7,8])
n = spark.parallelize([1,2,5,6])
m.zip(n).collect()

[(3, 1), (4, 2), (7, 5), (8, 6)]

In [26]:
rdd_join = rdd.union(rddResult) # join two RDD
rdd_join.collect()

[4, 3, 2, 1, 7, 8, 4, 7, 8]

In [27]:
rdd_join.top(3) # return biggest 3 numbers

[8, 8, 7]

In [28]:
rdd_join.distinct().collect()

[8, 1, 2, 3, 4, 7]

In [29]:
rdd_tuple  = spark.parallelize([(1,2) , (3,4) , (5,6) , (1,4) , (2,5)])

In [30]:
rdd_tuple.countByKey()

defaultdict(int, {1: 2, 3: 1, 5: 1, 2: 1})

In [31]:
rdd_join.countByValue()

defaultdict(int, {4: 2, 3: 1, 2: 1, 1: 1, 7: 2, 8: 2})

In [32]:
rdd_join.isEmpty()

False

In [33]:
test = rdd_join.map(lambda x : x+1 if x%2==0 else x)
test.collect()

[5, 3, 3, 1, 7, 9, 5, 7, 9]

In [34]:
test.stdev()

2.629368792488718

In [35]:
rdd_df = spark.parallelize([('Khang' , 22) , ('Tai' , 22) , ('Hoang' , 22) ,('Huy' , 22)] , 4)
rdd_df.collect()

[('Khang', 22), ('Tai', 22), ('Hoang', 22), ('Huy', 22)]

In [36]:
rdd_join = rdd_join.map(lambda x : x/2)
rdd_join.collect()

[2.0, 1.5, 1.0, 0.5, 3.5, 4.0, 2.0, 3.5, 4.0]

In [37]:
rdd = spark.parallelize([(0,1),(0,3),(1,5),(1,7)])
rdd.collect()

[(0, 1), (0, 3), (1, 5), (1, 7)]

In [38]:
rdd = rdd.map(lambda x : (x[0] *2 , x[1] * 2))
rdd.collect()

[(0, 2), (0, 6), (2, 10), (2, 14)]

In [39]:
rdd.countByKey()

defaultdict(int, {0: 2, 2: 2})

In [40]:
def test (x,y):
    return x + y 

rdd.reduceByKey(test).collect()

[(0, 8), (2, 24)]

In [45]:
rddFilter = spark.parallelize([0, 2, 0, 6, 2, 10, 2, 14])
rddFilter.collect()

[0, 2, 0, 6, 2, 10, 2, 14]

In [46]:
rddFilter.map(lambda x : x*2).filter(lambda x : x > 15).reduce(lambda val,cur : val + cur)

48

In [48]:
print(rddFilter.collect())
result = rddFilter.groupBy(lambda x : x >8).collect()
[(x , list(y)) for(x,y) in result]

[0, 2, 0, 6, 2, 10, 2, 14]


[(False, [0, 2, 0, 6, 2, 2]), (True, [10, 14])]

In [49]:
rddDuplicate = spark.parallelize([1,1,1,2,2,3,4,5,4,3])
rddDuplicate.distinct().collect()

[1, 2, 3, 4, 5]

In [5]:
rdd = spark.parallelize([(1,2) , (4,5) , (7,8)])
rdd.collect()

[(1, 2), (4, 5), (7, 8)]

[(4, [5]), (1, [2]), (7, [8])]