# 大数据期末作业
> 李劲哲
> 2020213697

## 目录
* 数据介绍
* 实验思路
* 实验代码
* 实验总结

## 数据介绍
* 本实验的数据集是steam上的游戏评论，主要包括的信息为游戏id，游戏名称，游戏评论，游戏评分，游戏推荐
* 原始数据集的大小约为2个G，实际使用的不包括游戏评论，所以在实验开始的时候，便对数据集进行了一定的处理，将数据集复制了三遍，最后数据的条数达到两千万条。

## 实验思路
* 本实验是从读取数据开始，一直到每个数据的分析，共分为了九个任务，对每个任务在不同的核数下的运行时间进行记录，最后汇总到一张图上进行呈现
* 本实验测试的核数为1，2，4， 8， 16

## 实验代码

In [1]:
from __future__ import division
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import re
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import isnull
import datetime
import numpy as np

### 这是读取文件的任务

In [2]:
def get_data():
    contents = spark.read.csv('data*.csv', header=True)
    contents.count()
    return contents

### 这是对每个游戏的评论数进行的统计

In [3]:
def game_review(df):
    game_counts = df.groupBy('app_name').agg(F.count('review_text'))
    gc = game_counts.collect()
    return gc

### 这是对每个游戏的得分的一个统计，得分处主要是-1， 1

In [4]:
def game_score(df):
    scores = df.groupBy('app_name').agg(F.sum('review_score'))
    s = scores.collect()
    return s

### 这是对每个游戏的给出1得分的统计

In [5]:
def game_score_pos(df):
    pos_score = df.select('app_name', 'review_score').filter(df['review_score'] == 1).groupBy('app_name').agg(F.count('review_score'))
    ps = pos_score.collect()
    return ps

### 这是对每个游戏-1得分的统计

In [6]:
def game_score_neg(df):
    neg_score = df.select('app_name', 'review_score').filter(df['review_score'] == -1).groupBy('app_name').agg(F.count('review_score'))
    ns = neg_score.collect()
    return ns

### 这个是对每个游戏是否值得推荐信息的统计，主要分为了0与1，统计愿意推荐的人数

In [7]:
def game_vote_pos(df):
    pos_vote = df.select('app_name', 'review_votes').filter(df['review_votes'] == 1).groupBy('app_name').agg(F.count('review_votes'))
    pv = pos_vote.collect()
    return pv

### 这个是对不推荐的人数统计

In [8]:
def game_vote_neg(df):
    neg_vote = df.select('app_name', 'review_votes').filter(df['review_votes'] == 0).groupBy('app_name').agg(F.count('review_votes'))
    nv = neg_vote.collect()
    return nv

### 这个是对平均推荐度的一个统计，值得注意的对进来的数据先进行了处理，也就是这个游戏的评价人数不能太少

In [9]:
def game_avg_vote(df):
    avg_vote = contents.select('app_name', 'review_votes').groupBy('app_name').agg(F.avg('review_votes'))\
                                                    .orderBy('avg(review_votes)', ascending=False)
    av = avg_vote.collect()
    return av

### 这个是对数据进行一个处理，希望统计出推荐数大于500000的游戏

In [10]:
def act_info(df):
    better_info = df.groupBy('app_name').agg(F.count('review_votes'))
    better_info_new = better_info.select('app_name', 'count(review_votes)').filter(better_info['count(review_votes)']  >= 500000).collect()
    return better_info_new

### 这个是第一个画图函数，主要表示的是一个对上文每个游戏的评论数的折线图的统计, 也可以用于对每个游戏的得分进行绘制

In [11]:
def pic_one(gc):
    game_index = [k + 1 for k in range(len(gc))]
    review_count = [int(rc[1]) for rc in gc]
    plt.figure(figsize=(10, 5))
    plt.plot(game_index, review_count, 'b-', label='review_count')
    plt.show()

### 这个是第二个画图函数，主要表现的是对游戏的得分或者是推荐度的的两种类别进行统计，这里都是只取五十个游戏展示

In [12]:
def pic_two(pos, neg):
    labels = []
    pos_nums = []
    neg_nums = []
    counts = 0
    category_names = ['positive', 'negtive']
    for i in range(2000):
        if pos[i][1] > 500 or neg[i][1] > 500:
            continue
        labels.append(pos[i][0])
        pos_nums.append(pos[i][1])
        neg_nums.append(neg[i][1])
        counts += 1
        if counts >= 50:
            break
    x = np.arange(len(labels))
    width = 0.35
    plt.figure(figsize=(25, 25))
    fig, ax = plt.subplots()
    rects1 = ax.bar(x - width/2, pos_nums, width, label='pos')
    rects2 = ax.bar(x + width/2, neg_nums, width, label='neg')
    ax.set_ylabel('counts')
    ax.set_title('counts pos and neg')
    ax.set_xticks(x, labels)
    ax.legend()

    fig.tight_layout()

    plt.show()

### 这个是第三种图像的函数，主要表现是进行一个折现图统计得分

In [13]:
def pic_three(df):
    game_index = [k + 1 for k in range(len(df))]
    review_count = [int(rc[1]) for rc in df]
    plt.figure(figsize=(10, 5))
    plt.plot(game_index, review_count, 'g-', label='test1')
    plt.show()

### 这个是第四种图像，主要是对1，2，4，8，16核任务进行处理的时间进行统计的折线图

In [14]:
def pic_four(t):
    plt.figure(figsize=(10, 5))
    time_index = [1, 2, 3, 4, 5]
    plt.plot(time_index, t[0], 'g-', label='test1')
    plt.plot(time_index, t[1], 'b-', label='test2')
    plt.plot(time_index, t[2], 'r-', label='test3')
    plt.plot(time_index, t[3], 'y-', label='test4')
    plt.plot(time_index, t[4], 'c-', label='test5')
    plt.plot(time_index, t[5], 'm-', label='test6')
    plt.plot(time_index, t[6], 'k-', label='test7')
    plt.plot(time_index, t[7], 'w-', label='test8')
    plt.plot(time_index, t[8], 'g-', label='test9')
    plt.show()

### 这是主函数需要对整个内核数进行调度

In [None]:
# 这里表示了，我需要运行的内核数
cores = [1, 2, 4, 8, 16]
# 这里将会对每个任务的时间进行统计
# 任务1：读取文件
# 任务2：对每个作业的评论数进行统计
# 任务3：对每个游戏的总计得分进行统计
# 任务4：对得分是1的个数进行统计
# 任务5：对得分是-1的个数进行统计
# 任务6：对愿意推荐的进行统计
# 任务7：对不愿意推荐的进行统计
# 任务8：选出推荐有效个数大于2000的游戏
# 任务9：计算这些结果的平均推荐度
test1 = []
test2 = []
test3 = []
test4 = []
test5 = []
test6, test7 = [], []
test8, test9 = [], []
test = []
for i in cores:
    print(i)
    spark = SparkSession.builder.config("spark.default.parallelism", "1280") \
                        .config('spark.driver.memory', '10G') \
                        .config('spark.rpc.message.maxSize', '2000') \
                        .appName("spark sql") \
                        .master("local[%d]" % i) \
                        .getOrCreate()

    # 任务1
    start1 = datetime.datetime.now()
    contents = get_data()
    end1 = datetime.datetime.now()
    test1.append((end1 - start1).seconds)
    # 任务2
    start2 = datetime.datetime.now()
    gc = game_review(contents)
    end2 = datetime.datetime.now()
    test2.append((end2 - start2).seconds)
    # 任务3
    start3 = datetime.datetime.now()
    s = game_score(contents)
    end3 = datetime.datetime.now()
    test3.append((end3 - start3).seconds)
    # 任务4
    start4 = datetime.datetime.now()
    ps = game_score_pos(contents)
    end4 = datetime.datetime.now()
    test4.append((end4 - start4).seconds)
    # 任务5
    start5 = datetime.datetime.now()
    ns = game_score_neg(contents)
    end5 = datetime.datetime.now()
    test5.append((end5 - start5).seconds)
    # 任务6
    start6 = datetime.datetime.now()
    pv = game_vote_pos(contents)
    end6 = datetime.datetime.now()
    test6.append((end6 - start6).seconds)
    # 任务7
    start7 = datetime.datetime.now()
    nv = game_vote_neg(contents)
    end7 = datetime.datetime.now()
    test7.append((end7 - start7).seconds)
    # 任务8
    start8 = datetime.datetime.now()
    bi = act_info(contents)
    end8 = datetime.datetime.now()
    test8.append((end8 - start8).seconds)
    # 任务9
    start9 = datetime.datetime.now()
    av = game_avg_vote(bi)
    end9 = datetime.datetime.now()
    test9.append((end9 - start9).seconds)
    # 关闭本阶段的内核。进行下一阶段
    spark.stop()
test.append(test1)
test.append(test2)
test.append(test3)
test.append(test4)
test.append(test5)
test.append(test6)
test.append(test7)
test.append(test8)
test.append(test9)
pic_four(test)


1


## 实验总结
* 第一张图分析了每个游戏所有的评论数量
![](p1.png)

* 第二张图分析了每个游戏的具体总得分数
![](p2.png)

* 第三张图分析了五十个游戏的正面评分与负面评分的个数
![](p3.png)