### 30 Days of Spark

#### 任务1：PySpark数据处理

*    步骤1：使用Python链接Spark环境
*    步骤2：创建dateframe数据
*    步骤3：用spark执行以下逻辑：找到数据行数、列数
*    步骤4：用spark筛选class为1的样本
*    步骤5：用spark筛选language >90 或 math> 90的样本


In [None]:
# 1、使用python链接Spark环境
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()
# 原始数据 
# 2、创建dataframe数据
test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
                            ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
                            ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],
                             ['number','class','language','math','english','physic','chemical'])
test.show()

##### 找到数据的行数和列数

In [None]:
# 方法一
column_len = len(test.columns)
print("The length of DataFrame's columns is %s" % column_len)

In [None]:
# 方法一
row_len = len(test.collect())
print("The length of DataFrame's rows is %s" % row_len)

In [None]:
# 方法二
shape = (test.count(), len(test.columns))

print("The length of DataFrame's rows is %s" % shape[0])
print("The length of DataFrame's columns is %s" % shape[1])

In [None]:
# 用spark筛选class为1的样本
test.filter(test['class'] == 1).show()

In [None]:
# 用spark筛选language>90 或math>90的样本
test.filter((test['language'] > 90) | (test['math'] > 90)).show()

-----------------------------

#### 任务2：PySpark数据统计

* 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
* 步骤2：将读取的进行保存，表头也需要保存
* 步骤3：分析每列的类型，取值个数
* 步骤4：分析每列是否包含缺失值


In [None]:
from pyspark import SparkFiles

# 读取文件
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')

# 将读取的进行保存
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')

In [None]:
df.show()

##### 分析每一列的类型和取值个数

In [None]:
# 方法一
df.dtypes

In [None]:
# 方法二
df.printSchema()

In [None]:
df.select('Name').count()

In [None]:
# 方法一：以去重的思想去分析列中的取值个数
# 可采用两种方法

# df.select('Name').drop_duplicates().count()

df.select('Name').distinct().count()

In [None]:
columns_list = df.columns

In [None]:
columns_list

In [None]:
for i in columns_list:
    value = df.select(i).drop_duplicates().count()
    print("列 %s 的取值为：%s" % (i, value))

In [None]:
# 方法二：使用聚合函数 countDistinct
import pyspark.sql.functions as F
for i in columns_list:
    print(df.agg(F.countDistinct(i).alias(i)).collect())



> 会发现上面的两个结果中，对于列“Type 2”的结果有所不同， 检查数据后发现是因为“Type 2”中包含有却是之的数据，在第一种方法中，会将空值“NULL”当作一个值去统计，而使用`countDisinct`函数，他会排除出空值数据后再进行统计。
> 下面先分析每列中是否包含有缺失值，然后再重新使用方法一统计。

##### 分析每列是否包含缺失值

In [None]:
# 增加对每一列进行去重处理后再统计取值
for i in columns_list:
    value = df.select(i).dropna().drop_duplicates().count()
    print("列 %s 的取值为：%s" % (i, value))

In [None]:
#统计每列数据缺失占比情况
df.agg(*[(1 - (F.count(c) / F.count('*'))).alias(c) for c in df.columns]).show()

In [None]:
# 分析每列中缺失值个数
df_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])

In [None]:
df_agg.show()

---------------------------------------------------

#### 任务三：

* 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
* 步骤2：学习groupby分组聚合的使用
* 步骤3：学习agg分组聚合的使用
* 步骤4：学习transform的使用
* 步骤5：使用groupby、agg、transform，统计数据在Type 1分组下 HP的均值

In [None]:
from pyspark import SparkFiles

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()
spark.sparkContext.addFile('Pokemon.csv')

# 在windows下需要将file:// 改为file:///
df = spark.read.csv("file:///"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')

##### 步骤2：学习groupby分组聚合的使用

PySpark DataFrame 还提供了一种使用常用方法拆分-应用-组合策略来处理分组数据的方法。按特定条件对数据进行分组，对每个组应用一个函数，然后将它们组合回 DataFrame。

In [None]:
df.show()

`df.groupby()`后可以使用自带基本统计功能的方法得到对应的结果（类似Pandas中GroupBy的用法）：

其中可以指定返回某一列或某几列的统计结果。

* `.count()`：返回每一组的数量，也就是行数。
* `.mean()`：返回每一组的mean。
* `.avg()`： 返回每一组的average。
* `.sum()`：返回每一组的总和。
* `.max()`：返回每一组的最大值。
* `.min()`：返回每一组的最小值。


> 均值(mean)是对恒定的真实值进行测量后，把测量偏离于真实值的所有值进行平均所得的结果；平均值(average)直接对一系列具有内部差异的数值进行的测量值进行的平均结果。均值是“观测值的平均”，平均值是“统计量的平均”

In [None]:
# 按照某一个字段分组 并统计各组的数量
df.groupby('Type 1').count().show()

In [None]:
# 按照某一个字段分组 并统计各组的平均值
df.groupby('Type 1').mean("Total", "HP").show()

In [None]:
# 按照某一个字段分组 并统计各组的平均值
df.groupby('Type 1').avg().show()

In [None]:
# 按照某一个字段分组 并统计各组各字段的最大值
df.groupby('Type 1').max().show()

In [None]:
# 按照某一个字段分组 返回指定字段的最大值
df.groupby('Type 1').max("HP").show()

In [None]:
# 按照某一个字段分组 并统计各组各字段的最小值
df.groupby('Type 1').min().show()

In [None]:
# 按照某一个字段分组 并统计各组各字段的总和
df.groupby('Type 1').sum().show()

##### 步骤3：学习agg分组聚合的使用

使用 agg() 函数，可以一次计算多个聚合。即可以对多列使用不同的集合函数进行聚合



In [None]:
from pyspark.sql.functions import sum,avg,max,min,mean,count
df.groupby('Type 1','Type 2').agg(count('HP').alias('总数'),
                        max('HP').alias('最大HP值'),
                        min('Attack').alias('最小攻击力')).show()

在 PySpark DataFrame 上，可以使用 where() 或 filter() 函数来过滤聚合数据的行

In [None]:
from pyspark.sql.functions import sum,avg,max,min,mean,count,col
df.groupby('Type 1','Type 2').agg(count('HP').alias('总数'),
                        max('HP').alias('最大HP值'),
                        min('Attack').alias('最小攻击力')) \
                        .where(col('最小攻击力')>=40).show()

##### 步骤4：学习transform的使用

返回一个新的 DataFrame。主要用于调用自定义的函数去处理DataFrame。

In [None]:
df.show()

In [None]:
# def cast_all_to_float(input_df):
#     return input_df.select([(col(col_name) + 10) for col_name in input_df.columns])
def sort_columns_asc(input_df):
    return input_df.select(*sorted(input_df.columns))
df.transform(sort_columns_asc).show()



##### 步骤5：使用groupby、agg、transform，统计数据在Type 1分组下 HP的均值

In [None]:
# 按照Type 1分组 并统计HP的均值
df.groupby('Type 1').mean('HP').show()

In [None]:
# 按照Type 1分组 并统计HP的均值
from pyspark.sql.functions import mean

df.groupby('Type 1').agg(mean('HP')).alias('Mean of HP').show()

In [None]:
aa = {'aa' : {'Value':'nm','ed':'3'}}
if 'aa' in aa:
    print(1)


In [None]:
type1_df = df.select('Type 1', 'HP')
# type1_df.show()
rows = type1_df.collect()
# {Type 1: value:{HP:, count:}}
result_dict = {}
# d = [{'name': 'Alice', 'age': 1}]
# output = spark.createDataFrame(d).collect()
for row in rows:
    if row['Type 1'] not in result_dict:
        # key
        result_dict[row['Type 1']] = {}

        result_dict[row['Type 1']]['HP'] = row['HP']
        result_dict[row['Type 1']]['count'] = 1
    else:
        result_dict[row['Type 1']]['HP'] += row['HP']
        result_dict[row['Type 1']]['count'] += 1

print(result_dict)

In [None]:
result_df = []
for k, v in result_dict.items():
    temp = {'Type 1':k,'mean':v['HP'] / v['count']}
    # temp[k] = v['HP'] / v['count']
    result_df.append(temp)
result_df


In [None]:
output = spark.createDataFrame(result_df).collect()
print(output)

In [None]:
# 按照Type 1分组 并统计HP的均值
def com_mean(input_df):

    return input_df.groupby('Type 1').mean('HP')

df.transform(com_mean).show()
    