In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
# # 创建连接配置,连接到standalone模式的集群
# conf = SparkConf().setAppName('sparkRddDemo').setMaster("spark://sparkstandalone:7077")
# # 设定driver的地址，非常重要，standalone模式的集群
# conf.set("spark.driver.host","192.168.88.1")
# # 获取spark上下文,创建到集群的连接
# sc =  SparkContext(conf=conf)

In [None]:
# 创建连接配置，本地连接
conf = SparkConf().setAppName('sparkRddDemo').setMaster("local[2]")
# 获取spark上下文,创建到集群的连接
sc =  SparkContext(conf=conf)

In [None]:
class A(object):

    def __init__(self,a:int) -> None:
        self.a = a

    def __str__(self) -> str:
        return str(self.a)
    
    def get(self) -> int:
        return self.a

In [None]:
# 有两种方式可以创建rdds，一种是通过上下文提供的并行化方法从一个可迭代对象或者collection中获取
# 另一种是内部的存储系统
# 下面从一个可迭代的对象中获取
# data = list('ddddeeee')
data = [i for i in range(1,6)]
distData = sc.parallelize(data)
print(type(distData))
print(distData.collect())
print(distData.reduce(lambda a, b: a + b))

In [None]:
# 从文本文件创建
distFile = sc.textFile("./*.md")
print(distFile.collect())
# 计算所有的单词的长度
print(distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b))

In [None]:
# 从文本文件创建
distFiles = sc.wholeTextFiles("./")
# distFiles.map(lambda line:len(line)).reduce(lambda a,b:a+b)
print(distFiles.collect())

In [None]:
# 将distFiles使用pickle的方式进行持久化
distFile.saveAsPickleFile("ts.pickle")

In [None]:
# 下面通过pickle的方式读取持久化的数据
ds = sc.pickleFile("ts.pickle")
print(type(ds))
ds.collect()

In [None]:
# 保存和读取sequenceFiles
rdd = sc.parallelize(range(1,4)).map(lambda x:(x,"a"*x))
print(type(rdd))
rdd.saveAsSequenceFile('sequence/to/file')

In [None]:
# 读取sequenceFiles文件
sorted(sc.sequenceFile('sequence/to/file').collect())

In [None]:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s:len(s))
print(lineLengths)
totalLengths = lineLengths.reduce(lambda a,b:a+b)
print(totalLengths)

In [None]:
# 使用hadoop的inputformat来读取数据
path = "./test/"
rdd = sc.newAPIHadoopFile(
    path=path,
    inputFormatClass="org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat",
    keyClass="org.apache.hadoop.io.LongWritable",
    valueClass="org.apache.hadoop.io.Text",
    conf={
     "mapreduce.input.fileinputformat.split.maxsize": "4194304"
     # "mapreduce.input.fileinputformat.split.minsize":" 4194304"
    }
)

In [None]:
word_kv =rdd.flatMap(lambda x:x[1].split(' ')).map(lambda x:(x,1))
word_kv.reduceByKey(lambda a,b:a+b).collect()

In [None]:
# 使用hadoop的inputformat来读取数据
path = "hdfs://sparkstandalone:8020/data/"
rdd = sc.newAPIHadoopFile(
    path=path,
    inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
    keyClass="org.apache.hadoop.io.LongWritable",
    valueClass="org.apache.hadoop.io.Text",
)

In [None]:
word_kv =rdd.flatMap(lambda x:x[1].split(' ')).map(lambda x:(x,1))
word_kv.reduceByKey(lambda a,b:a+b).collect()

In [None]:
# 同方式一，使用textfiles，从其他文件系统创建
# 只不过这个时候需要声明一下文件系统的协议
# 如：hdfs://, s3a://,
distFile = sc.textFile("hdfs://sparkstandalone:8020/data/data.txt")
# SparkContext.wholeTextFiles可以读取整个文件夹下面的文件
# 计算所有的单词的长度
print(distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b))

In [None]:
# 调用textfiles时，并不会加载数据到内存
lines = sc.textFile("hdfs://sparkstandalone:8020/data/data.txt")
# map是一个transformation操作，也不会加载数据到内存
lineLengths = lines.map(lambda s: len(s))
# reduce是一个action操作，会记载数据到内存当中，且运算完毕后只会返回一个结果，
# 若后续还需要使用这个rdd，则需要调用持久化方法，将其持久化到内存中
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

In [None]:
# spark允许自定义方法用于数据的处理，如下，可对一行进行单词次数统计
def wordCount(s:str):
    words = s.split(' ')
    return len(words)

lines.map(wordCount).collect()

In [None]:
# 也可以通过定义对像，在对象的方法中进行处理，
# 但是这种方式会造成整个对象会被发送到集群当中
# 原因这个方法调用了该类当中的其他方法
class MyClass(object):
    def func(self,s):
        return s
    def doStuff(self,rdd):
        return rdd.map(self.func)
handler = MyClass()
result = handler.doStuff(lines).collect()
print(result)

In [None]:
# 类似的，若是对象中处理rdd的方式引用了该类当中的其他属性
# 也会造成整个对象被发送到集群当中
class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)
handler = MyClass()
result = handler.doStuff(lines).collect()
print(result)

In [None]:
# 上面这两种情况都会造成内存的浪费（spark使用内存存储数据）
# 最好的方式是定义局部变量赋值，避免对对象中其他属性的直接引用
class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        field = self.field # 通过局部变量的方式，避免对类中其他变量的引用
        return rdd.map(lambda s: field + s)
handler = MyClass()
result = handler.doStuff(lines).collect()
print(result)

In [None]:
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

In [None]:
lines = sc.textFile("hdfs://sparkstandalone:8020/data/data.txt")
# 处理成键值对，键是单词，值为1
pairs = lines.map(lambda s:(s,1))
# 按单词聚合后累加，注意reduceByKey不是action操作，是transformation操作
counts = pairs.reduceByKey(lambda a,b:a+b)
# collect是action操作
print(counts.collect())
# 按单词排序
print(counts.sortByKey(ascending=False).collect())

In [None]:
a = [1,2,3,4]
broadcastVar = sc.broadcast(a)

In [None]:
broadcastVar.value

In [None]:
b = [1,2,3,4,5,6,7,8,9,0,]
broadcastVar = sc.broadcast(b)
broadcastVar.value

In [None]:
broadcastVar.destroy(True)

In [None]:
broadcastVar.value

In [None]:
accum = sc.accumulator(0)
type(accum)
print(accum)
sc.parallelize([1,2,3,4,5,6,7,8,9]).foreach(lambda x:accum.add(x))

In [None]:
from pyspark.accumulators import AccumulatorParam

# 这里实现了一个向量的加法
class VectorAccumulatorParam(AccumulatorParam):

    def zero(self, value: list) -> list:
        return [0.0]*len(value)
    
    # 累加器提供一个add方法，这个是对add方法的实现
    def addInPlace(self, value1: list, value2: list) -> list:
        for i in range(len(value1)):
            value1[i] += value2[i]
        return value1

In [None]:
# init
va = sc.accumulator([1,2,3],VectorAccumulatorParam())
print("init:",va.value)
data = [[x]*3 for x in range(1,4)]
# data = [1,2,3]
rdd = sc.parallelize(data)
# # 定义一个函数，用于执行这样的累加运算
# def g(x):
#     global va
#     va.add([x]*3) # 如果 data = [1,2,3]，则可以这样子操作
# rdd.foreach(g)
# print("after oper:",va.value)

In [None]:
rdd.map(lambda x:va.add(x))
# 此时，radd还未执行action，不会修改累加器的值！！！
print(va.value)
#[1, 2, 3]

In [None]:
# rdd执行了action，会修改累加器的值！！
rdd.map(lambda x:va.add(x)).collect()
print(va.value)

In [None]:
rdd = sc.parallelize(data)

In [None]:
rdd.take(1)

In [None]:
print(accum)

In [None]:
accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)

In [None]:
sc.stop()

In [None]:
distFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)

In [None]:
sc.stop()

In [None]:
from pyspark.sql import SparkSession

In [None]:
distData.reduce(lambda a,b:a+b).collect()