In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession

os.environ["PYSPARK_PYTHON"]="python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="python3"
os.environ['SPARK_HOME'] = "/home/mayfly/workstation/env/spark"

In [2]:
file_path = "file:///home/mayfly/workstation/mycode/spark_study/pat_sort.txt"
# 创建sparkcontext对象
sc = SparkContext(appName="pat_sort", master="local")
# 加载文件
mrdd = sc.textFile(file_path)

In [3]:
# 根据第一行获取数据信息，第二行获取最高分信息
# mrdd = mrdd.zipWithIndex()
data_info, score_info = mrdd.collect()[:2]
data_info = data_info.split(" ")
score_info = score_info.split(" ")
man_num = int(data_info[0])
type_num = sc.broadcast(int(data_info[1]))   # 将种类数广播出去
data_num = int(data_info[2])

In [4]:
# 过滤掉开头的两行，并按照"id problem_id":"score"划分数据
mrdd = mrdd.zipWithIndex().filter(lambda x:x[1]>1).map(lambda x:(" ".join(x[0].split(" ")[:2]), x[0].split(" ")))
print(mrdd.collect())

[('00002 2', ['00002', '2', '12']), ('00007 4', ['00007', '4', '17']), ('00005 1', ['00005', '1', '19']), ('00007 2', ['00007', '2', '25']), ('00005 1', ['00005', '1', '20']), ('00002 2', ['00002', '2', '2']), ('00005 1', ['00005', '1', '15']), ('00001 1', ['00001', '1', '18']), ('00004 3', ['00004', '3', '25']), ('00002 2', ['00002', '2', '25']), ('00005 3', ['00005', '3', '22']), ('00006 4', ['00006', '4', '-1']), ('00001 2', ['00001', '2', '18']), ('00002 1', ['00002', '1', '20']), ('00004 1', ['00004', '1', '15']), ('00002 4', ['00002', '4', '18']), ('00001 3', ['00001', '3', '4']), ('00001 4', ['00001', '4', '2']), ('00005 2', ['00005', '2', '-1']), ('00004 2', ['00004', '2', '0'])]


In [5]:
# 多次提交取最佳成绩
mrdd = mrdd.reduceByKey(lambda x,y:x[2]>=y[2] and x or y)
print(mrdd.collect())
mrdd.cache()

[('00002 2', ['00002', '2', '25']), ('00007 4', ['00007', '4', '17']), ('00005 1', ['00005', '1', '20']), ('00007 2', ['00007', '2', '25']), ('00001 1', ['00001', '1', '18']), ('00004 3', ['00004', '3', '25']), ('00005 3', ['00005', '3', '22']), ('00006 4', ['00006', '4', '-1']), ('00001 2', ['00001', '2', '18']), ('00002 1', ['00002', '1', '20']), ('00004 1', ['00004', '1', '15']), ('00002 4', ['00002', '4', '18']), ('00001 3', ['00001', '3', '4']), ('00001 4', ['00001', '4', '2']), ('00005 2', ['00005', '2', '-1']), ('00004 2', ['00004', '2', '0'])]


PythonRDD[7] at collect at <ipython-input-5-99342f6ab0a3>:3

In [6]:
# 将每个人的分数合并在一起
score_rdd = mrdd.map(lambda x:(x[1][0], x[1])).sortByKey()
print(score_rdd.collect())
score_rdd = score_rdd.reduceByKey(lambda x, y:[x[0], ",".join([x[1], y[1]]), ",".join([x[2], y[2]])])
# score_rdd = score_rdd.reduceByKey(lambda x, y:[x[0], str(int(x[2])+int(y[2]))])
print(score_rdd.collect())
# 将RDD持久化到内存中，避免重复计算


[('00001', ['00001', '1', '18']), ('00001', ['00001', '2', '18']), ('00001', ['00001', '3', '4']), ('00001', ['00001', '4', '2']), ('00002', ['00002', '2', '25']), ('00002', ['00002', '1', '20']), ('00002', ['00002', '4', '18']), ('00004', ['00004', '3', '25']), ('00004', ['00004', '1', '15']), ('00004', ['00004', '2', '0']), ('00005', ['00005', '1', '20']), ('00005', ['00005', '3', '22']), ('00005', ['00005', '2', '-1']), ('00006', ['00006', '4', '-1']), ('00007', ['00007', '4', '17']), ('00007', ['00007', '2', '25'])]
[('00001', ['00001', '1,2,3,4', '18,18,4,2']), ('00002', ['00002', '2,1,4', '25,20,18']), ('00004', ['00004', '3,1,2', '25,15,0']), ('00005', ['00005', '1,3,2', '20,22,-1']), ('00006', ['00006', '4', '-1']), ('00007', ['00007', '4,2', '17,25'])]


In [7]:
# 计算总分并对得分按照score,solved num,id的优先级排序，由于spark的默认排序方式稳定，所以分三次排序
def cul_sum_score(x):
    print(x)
    score_list = ["-" for i in range(type_num.value)]
    sum_score = 0
    type_id = x[1].split(",")
    man_score = x[2].split(",")
    solved_num = 0
    for index, t in enumerate(type_id):
        if man_score[index]=="-1":
            continue
        solved_num += 1
        score_list[int(t)-1] = man_score[index]
        sum_score += int(man_score[index])
    if sum_score>0:
        return [str(solved_num), x[0], str(sum_score), " ".join(score_list)]
score_rdd = score_rdd.map(lambda x:(x[0], cul_sum_score(x[1]))).filter(lambda x:x[1] is not None).sortBy(lambda x:x[1][1], ascending=True).sortBy(lambda x:x[1][0], ascending=True).sortBy(lambda x:x[1][2], ascending=False)
print(score_rdd.collect())

[('00002', ['3', '00002', '63', '20 25 - 18']), ('00005', ['2', '00005', '42', '20 - 22 -']), ('00007', ['2', '00007', '42', '- 25 - 17']), ('00001', ['4', '00001', '42', '18 18 4 2']), ('00004', ['3', '00004', '40', '15 0 25 -'])]


In [8]:
# 根据排序结果生成序列号(生成index:rdd对)
ran_rdd = score_rdd.zipWithIndex().map(lambda x:(x[1], x[0][1]))
# 根据成绩分组
ran_rdd = ran_rdd.groupBy(lambda x:x[1][2]).collect()
# print(trdd)
# 输出排序结果，每一组的排名按照本组内的最小排名计算
flag = "-"
rem_index=-1
for index, t in enumerate(ran_rdd):
    for tt in t[1]:
        if flag == tt[1][2]:
            print("{} {}".format(rem_index, " ".join(tt[1][1:])))
        else:
            flag = tt[1][2]
            rem_index = tt[0]+1
            print("{} {}".format(rem_index, " ".join(tt[1][1:])))

1 00002 63 20 25 - 18
2 00005 42 20 - 22 -
2 00007 42 - 25 - 17
2 00001 42 18 18 4 2
5 00004 40 15 0 25 -
