In [46]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
master = 'local'
sc = SparkContext(master, 'Sum')

In [7]:
arr = list(range(7))
arr

[0, 1, 2, 3, 4, 5, 6]

In [8]:
vec = sc.parallelize(arr ,2)
vec, type(vec), vec.collect()

(ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195,
 pyspark.rdd.RDD,
 [0, 1, 2, 3, 4, 5, 6])

# spark RDD 转化操作

In [9]:
## map
r1 = vec.map(lambda x: x+1)
r1.collect()

[1, 2, 3, 4, 5, 6, 7]

In [11]:
### filter
r2 = vec.filter(lambda x: (x-2)%2==0)
r2.collect()

[0, 2, 4, 6]

In [13]:
### flatmap
r3 = r2.map(lambda x: list(range(x)))
print(r3.collect())
r4 = r2.flatMap(lambda x: list(range(x)))
r4.collect()

[[], [0, 1], [0, 1, 2, 3], [0, 1, 2, 3, 4, 5]]


[0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4, 5]

In [14]:
### glom
r5 = vec.glom()
r5.collect()

[[0, 1, 2], [3, 4, 5, 6]]

In [16]:
### mapPartitions
def f(iters):
    yield sum(iters)
r6 = vec.mapPartitions(f)
r6.collect(), r6.glom().collect()

([3, 18], [[3], [18]])

In [18]:
### mapPartitionsWithIndex
def f1(index, iters):
    yield index, sum(iters)
r7 = vec.mapPartitionsWithIndex(f1)
r7.collect(), r7.glom().collect()

([(0, 3), (1, 18)], [[(0, 3)], [(1, 18)]])

In [19]:
### getNumPartitions
vec.getNumPartitions()

2

In [21]:
### sample
vec.sample(0, 0.1).collect()

[0, 6]

In [27]:
### union
r8 = vec.union(vec)
r8.collect(), r8.getNumPartitions()

([0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6], 4)

In [30]:
### intersection 交集
vec1 = sc.parallelize(list(range(3, 10)), 3)
r9 = vec1.intersection(vec)
r9.collect(), r9.getNumPartitions()

([5, 6, 3, 4], 5)

In [32]:
### distinct
r8.collect(), r8.distinct().collect()

([0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6], [0, 4, 1, 5, 2, 6, 3])

In [33]:
### groupByKey
vec2 = sc.parallelize([(1,2), (1, 3), (2, 3), (1, 0), (3, 1), (2, 6)], 3)
r10 = vec2.groupByKey()
r10.collect()

[(3, <pyspark.resultiterable.ResultIterable at 0x109d0c5c0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x109d0cda0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x109d0ce48>)]

# spark RDD 行动操作

In [34]:
### reduce 先把前两个元素当做x,y 进行操作, 然后把结果当做下一个x, 之后的元素当做y进行计算
rd1 = vec.reduce(lambda x,y: x+y)
rd1

21

In [35]:
### collect

In [36]:
### count
vec.count()

7

In [37]:
### first
vec.first()

0

In [38]:
### take
vec.take(3)

[0, 1, 2]

In [39]:
### takeSample
vec.takeSample(0, 2)

[4, 3]

In [40]:
### saveAsTextFile
vec.saveAsTextFile('saveAsTextFile.demo.txt')

In [41]:
### countByKey
vec2.countByKey()

defaultdict(int, {1: 3, 2: 2, 3: 1})

In [44]:
### foreach  -> map+collect
rd2 = vec.foreach(lambda x: x+0.01)
vec.collect(), rd2

([0, 1, 2, 3, 4, 5, 6], None)

In [45]:
data_path = '../../data/small_car_price_train.201908.csv'
data_path1 = '../../data/small_car_price_test.201908.csv'

sqlContext = SQLContext(sc)
traindata = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(data_path)
testdata = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(data_path1)

In [52]:
traindata.count()

1509

In [53]:
testdata.count()

1094