In [1]:
import findspark
findspark.init("A:/Zhao/bigdata/spark")

In [2]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Spark test") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [6]:
logFile = "A:/Zhao/bigdata/spark/README.md"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print ("Lines with a: "+ str(numAs) +", lines with b: "+ str(numBs))

Lines with a: 61, lines with b: 30


In [9]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

words.count()

8

In [10]:
words.collect()

['scala',
 'java',
 'hadoop',
 'spark',
 'akka',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark']

In [11]:
def f(x): print(x)
words.foreach(f)

In [12]:
words_filter = words.filter(lambda x: 'spark' in x)
words_filter.collect()

['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

In [13]:
words_map = words.map(lambda x: (x, 1))
words_map.collect()

[('scala', 1),
 ('java', 1),
 ('hadoop', 1),
 ('spark', 1),
 ('akka', 1),
 ('spark vs hadoop', 1),
 ('pyspark', 1),
 ('pyspark and spark', 1)]

In [14]:
# reduce(f)

from operator import add

nums = sc.parallelize([1, 2, 3, 4, 5])
nums.reduce(add)

15

In [15]:
#join(other, numPartitions = None)

x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
joined.collect()

[('hadoop', (4, 5)), ('spark', (1, 2))]

In [16]:
'''
cache()
Persist this RDD with the default storage level (MEMORY_ONLY). 
You can also check if the RDD is cached or not.
'''
words.cache() 
words.persist().is_cached

True

----------------------------------------------
**PySpark - Broadcast & Accumulator**

For parallel processing, Apache Spark uses shared variables. 
A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, 
so that it can be used for performing tasks.

There are two types of shared variables supported by Apache Spark −
**Broadcast & Accumulator**

**Broadcast**
Broadcast variables are used to save the copy of data across all nodes. 
This variable is cached on all the machines and not sent on machines with tasks. 
A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value.

In [18]:
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print (data) 
elem = words_new.value[2] 
print (elem)

['scala', 'java', 'hadoop', 'spark', 'akka']
hadoop


**Accumulator**
Accumulator variables are used for aggregating the information through associative and commutative operations. 
For example, you can use an accumulator for a sum operation or counters (in MapReduce).
An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. 
It stores the data and is used to return the accumulator's value, but usable only in a driver program.

In [22]:
num = sc.accumulator(0) #initial value needed
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([10,20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print (final)

150


In [26]:
# SparkFiles

from pyspark import SparkFiles
finddistance = "A:/Zhao/bigdata/spark/examples/src/main/python/mllib/logistic_regression.py"
finddistancename = "logistic_regression.py"
sc.addFile(finddistance)
SparkFiles.get(finddistancename)

'C:\\Users\\hez\\AppData\\Local\\Temp\\spark-9a5bcb07-aa7b-45e2-8df2-c7e0747f485a\\userFiles-01a78d99-306b-4f4a-a636-1bd73f98709f\\logistic_regression.py'

In [27]:
# StorageLevel

import pyspark
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())

Disk Memory Serialized 2x Replicated


--------------------------------------------
**MLlib**

**test.data**
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0

In [34]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
       #sc = SparkContext(appName="Pspark mllib Example")
       data = sc.textFile("A:/Zhao/bigdata/test/test.data")
       ratings = data.map(lambda l: l.split(','))\
          .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
       # Build the recommendation model using Alternating Least Squares
       rank = 10
       numIterations = 10
       model = ALS.train(ratings, rank, numIterations)
   
       # Evaluate the model on training data
       testdata = ratings.map(lambda p: (p[0], p[1]))
       predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
       ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
       MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
       print("Mean Squared Error = " + str(MSE))
   
       # Save and load model
       model.save(sc, "A:/Zhao/bigdata/test/myCollaborativeFilter")
       sameModel = MatrixFactorizationModel.load(sc, "A:/Zhao/bigdata/test/myCollaborativeFilter")

Mean Squared Error = 5.145170763112815e-06


In [36]:
spark.stop()