In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions
import numpy as np
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Pie
from pyecharts.commons.utils import JsCode
from pyecharts.charts import Grid, Line, Scatter
from pyecharts.charts import Timeline
from pyecharts.faker import Faker

In [36]:
from pyecharts.charts import Map

In [2]:
sc = SparkContext("local","app1")

In [3]:
spark=SparkSession.builder.appName("boye").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("hdfs://localhost:9000/user/hadoop/PM25city")
print(df.dtypes)

[('站号', 'int'), ('经度', 'double'), ('纬度', 'double'), ('PM25', 'int'), ('PM10', 'int'), ('NO2', 'int'), ('SO2', 'int'), ('O3-1', 'int'), ('O3-8h', 'int'), ('CO', 'double'), ('AQI', 'int'), ('等级', 'int'), ('year', 'int'), ('month', 'int'), ('day', 'int'), ('hour', 'int'), ('city', 'string')]


In [4]:
#删除重复值
df.dropDuplicates()

DataFrame[站号: int, 经度: double, 纬度: double, PM25: int, PM10: int, NO2: int, SO2: int, O3-1: int, O3-8h: int, CO: double, AQI: int, 等级: int, year: int, month: int, day: int, hour: int, city: string]

In [5]:
#统计记录数量
ls = df.columns
count = df.count()
print("记录数：",count)

记录数： 398355


In [6]:
df.describe(['PM25']).show()

+-------+-----------------+
|summary|             PM25|
+-------+-----------------+
|  count|           398355|
|   mean|58.73549974269182|
| stddev|57.21569843647059|
|    min|                0|
|    max|              589|
+-------+-----------------+



# Question 1

In [7]:
#将每天pm2.5的最小值作为每天的pm2.5值
#求所有城市的平均值
df_day_mean = df.groupby("year","month","day","站号","city").agg({"PM25":"min"})
df2 = df_day_mean.groupby("city").agg({"min(PM25)":"avg"}).sort("avg(min(PM25))")
df2.show()

+--------+------------------+
|    city|    avg(min(PM25))|
+--------+------------------+
|    海口| 9.840816326530613|
|    昆明|13.761270491803279|
|    厦门|14.846463022508038|
|呼和浩特|14.951289398280803|
|    上海| 23.26162962962963|
|    北京|26.387667304015295|
|    天津|27.450048496605238|
|    青岛|29.182879377431906|
|    成都| 34.78746327130264|
|    济南| 45.89268805891636|
|乌鲁木齐|52.323340471092074|
|    郑州| 59.52669160149609|
+--------+------------------+



In [62]:
import pyecharts.options as opts
from pyecharts.charts import Line
y = np.round(df2.toPandas()['avg(min(PM25))'].values.tolist(),2)
x_citys = df2.toPandas()['city'].values.tolist()

# value = []
# for i in range(len(y)):
#     j=0
#     if i%2==0:
#         value.append(y[0])
#     else:
#         value.append(None)
#     j+=1
        
c = (
    Line()
    .add_xaxis(x_citys)
    .add_yaxis("PM2.5浓度", y)
    .set_global_opts(
                     title_opts=opts.TitleOpts(title="全国各城市PM2.5浓度"),
     xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),)
    .render("最小值.html")
)

In [8]:
# 将每天PM2.5的最大值作为当天的PM2.5值
df1= df.groupby("year","month","day","站号","city").agg({"PM25":"max"})
df3 = df1.groupby("city").agg({"max(PM25)":"avg"}).sort("avg(max(PM25))")
df3.show(10)

+--------+------------------+
|    city|    avg(max(PM25))|
+--------+------------------+
|    海口| 25.37142857142857|
|    昆明|38.329918032786885|
|    厦门|43.348874598070736|
|    上海| 60.56325925925926|
|    成都| 76.00293829578844|
|呼和浩特| 87.70773638968481|
|    青岛| 92.29036964980544|
|    北京| 93.68451242829828|
|    天津| 98.16553507921112|
|    济南|122.06470278800631|
+--------+------------------+
only showing top 10 rows



In [63]:
y_2 = np.round(df3.toPandas()['avg(max(PM25))'].values.tolist(),2)
x_citys_2 = df3.toPandas()['city'].values.tolist()
c = (
    Line()
    .add_xaxis(x_citys_2)
    .add_yaxis("PM2.5浓度", y_2, is_connect_nones=True)
    .set_global_opts(xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
                     title_opts=opts.TitleOpts(title="全国各城市PM2.5浓度"))
    .render("平均值最大值.html")
)

In [9]:
#求所有城市所有记录的平均值
df4 = df.groupby('city').agg({"PM25":"mean"}).sort("avg(PM25)")
df4.show()

+--------+------------------+
|    city|         avg(PM25)|
+--------+------------------+
|    海口|16.023941264098745|
|    昆明| 23.64524182778842|
|    厦门|27.681850789096128|
|    上海|41.380602802979745|
|呼和浩特| 41.75553613053613|
|    北京| 54.73764218135431|
|    成都| 56.21107737155351|
|    天津| 56.37377220858009|
|    青岛| 60.41360111859667|
|    济南| 83.21077401685767|
|乌鲁木齐| 83.67879768162825|
|    郑州| 94.13384405730912|
+--------+------------------+



In [64]:
y_3 = np.round(df4.toPandas()['avg(PM25)'].values.tolist(),2)
x_citys_3 = df4.toPandas()['city'].values.tolist()
c = (
    Line()
    .add_xaxis(x_citys_3)
    .add_yaxis("PM2.5浓度", y_3, is_connect_nones=True)
    .set_global_opts(xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
                     title_opts=opts.TitleOpts(title="全国各城市PM2.5浓度"))
    .render("平均值.html")
)

In [10]:
#将每个地区pm2.5的最大值排序
df5 = df.groupby("city").agg({"PM25":"max"}).sort("max(PM25)")
df5.show()

+--------+---------+
|    city|max(PM25)|
+--------+---------+
|    海口|      167|
|    昆明|      288|
|    上海|      288|
|    厦门|      303|
|    成都|      359|
|    青岛|      425|
|    郑州|      521|
|呼和浩特|      530|
|    济南|      534|
|乌鲁木齐|      546|
|    天津|      589|
|    北京|      589|
+--------+---------+



# Question 2

In [11]:
# report the air quality distribution of 北京 上海 成都 throughout February in the year of 2019
# Good Moderate Unhealthy Very Unhealthy Hazardous
df_2 = df.filter("city == '北京' or city == '上海' or city=='成都' ")
df_2 = df_2.filter("year==2019 and month==2")

In [12]:
data_2 = df_2.toPandas()
data_2[data_2['AQI']>500] = 500 #处理异常值

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


In [13]:
data_2.shape

(42825, 17)

In [14]:
def set_air_quality(x):
    if x<=50:
        x = 'good'
    elif x<=100:
        x = 'Moderate'
    elif x<=150:
        x = 'Unhealthy for Sensitive Groups'
    elif x<=200:
        x = 'Unhealthy'
    elif x<=300:
        x = 'Very Unhealthy'
    else:
        x = 'Hazardous'
    return x
data_2['air_quality'] = data_2['AQI']
data_2['air_quality'] = data_2['air_quality'].apply(lambda x:set_air_quality(x))
# data_2[data_2['AQI']<=100 & data_2['AQI']>50]['air_quality'] = 'Moderate'
#以一天中大部分站点的空气质量情况作为该城市这天的空气质量情况
df_air_quality = data_2.groupby(['day','city'])['air_quality'].value_counts()

In [15]:
(df_air_quality)

day  city  air_quality                   
1    上海    Moderate                          297
           Unhealthy for Sensitive Groups    227
           good                               41
           Unhealthy                           1
     北京    Moderate                          339
                                            ... 
28   北京    Hazardous                          11
           good                                3
     成都    Moderate                          517
           Unhealthy for Sensitive Groups     20
           good                                8
Name: air_quality, Length: 304, dtype: int64

In [16]:
d = df_air_quality.sort_values()
d = pd.DataFrame(d)
d.rename(columns = {'air_quality':'counts'},inplace = True)
d.reset_index(inplace = True)

In [17]:
air_dayly = d. sort_values(by=['day','counts'],ascending=[True,False])

In [18]:
air_dayly.head(10)

Unnamed: 0,day,city,air_quality,counts
272,1,成都,Moderate,408
260,1,北京,Moderate,339
254,1,上海,Moderate,297
231,1,上海,Unhealthy for Sensitive Groups,227
192,1,北京,Unhealthy for Sensitive Groups,131
181,1,成都,Unhealthy for Sensitive Groups,117
118,1,北京,good,42
111,1,上海,good,41
88,1,成都,good,29
53,1,北京,Unhealthy,15


In [19]:
air_quality_Feb = d.groupby(['day','city']).apply(lambda df:df[df['counts']==df.counts.max()])

In [20]:
air_quality_Feb = pd.DataFrame(air_quality_Feb)

In [21]:
air_quality_Feb.drop(columns=['day','city'],inplace=True)

In [22]:
air_quality_Feb.reset_index(inplace=True)

In [23]:
air_quality_Feb.drop(columns=['level_2'])

Unnamed: 0,day,city,air_quality,counts
0,1,上海,Moderate,297
1,1,北京,Moderate,339
2,1,成都,Moderate,408
3,2,上海,Moderate,550
4,2,北京,Unhealthy for Sensitive Groups,223
...,...,...,...,...
80,27,北京,Unhealthy for Sensitive Groups,196
81,27,成都,Moderate,384
82,28,上海,Moderate,427
83,28,北京,Moderate,173


In [24]:
result = pd.DataFrame(air_quality_Feb.groupby(['city'])['air_quality'].value_counts())
result

Unnamed: 0_level_0,Unnamed: 1_level_0,air_quality
city,air_quality,Unnamed: 2_level_1
上海,good,13
上海,Moderate,11
上海,Unhealthy for Sensitive Groups,4
北京,Moderate,12
北京,good,10
北京,Unhealthy for Sensitive Groups,3
北京,Very Unhealthy,3
北京,Hazardous,1
成都,Moderate,24
成都,Unhealthy for Sensitive Groups,2


In [26]:
#可视化统计结果
result.rename(columns={"air_quality":"days"},inplace=True)
result.reset_index(inplace=True)

In [27]:
result_citys = ['上海','北京','成都']
result_counts = []
result_air_quality = []
for city in result_citys:
    result_air_quality.append(result[result['city']==city]['air_quality'].values.tolist())
    result_counts.append(result[result.city==city]['days'].values.tolist())

In [28]:
pie=Pie().add(
    "",
    [list(z) for z in zip(result_air_quality[0],result_counts[0])],
    center=["20%", "30%"],
    radius=[60, 80]
).set_global_opts(
    title_opts=opts.TitleOpts(title="上海空气质量情况"),
    legend_opts=opts.LegendOpts(is_show=False))

    
pie2 = Pie().add(
    "",
    [list(z) for z in zip(result_air_quality[1],result_counts[1])],
    center=["60%", "30%"],
    radius=[60, 80]
).set_global_opts(
    title_opts=opts.TitleOpts(title="北京空气质量情况",pos_left="40%"),
    legend_opts=opts.LegendOpts(
        type_="scroll", pos_top="60%", pos_left="41%", orient="vertical"
    ))
    
pie3 = Pie().add(
    "",
    [list(z) for z in zip(result_air_quality[2],result_counts[2])],
    center=["20%", "70%"],
    radius=[60, 80]
).set_global_opts(
    title_opts=opts.TitleOpts(title="成都空气质量情况", pos_top ="90%"),
    legend_opts=opts.LegendOpts(is_show=False))

grid = (
Grid()
.add(pie, grid_opts=opts.GridOpts(pos_left="55%"))
.add(pie2, grid_opts=opts.GridOpts(pos_right="30%"))
.add(pie3, grid_opts=opts.GridOpts(pos_right="20%"))
.render("grid_horizontal.html")
)

# Question  3

In [29]:
#分析每个城市中空气中的主要污染物
#环保措施

## 成都市 2018-2019年主要污染物

In [25]:
df_chengdu = df.filter("city=='成都'")

In [26]:
df_chengdu_daily = df_chengdu.groupby('year','month').agg({'NO2':'mean',"SO2":"mean","PM25":"mean","PM10":"mean","CO":"mean","O3-8h":"mean"})

In [27]:
df_chengdu_daily

DataFrame[year: int, month: int, avg(O3-8h): double, avg(SO2): double, avg(NO2): double, avg(CO): double, avg(PM25): double, avg(PM10): double]

In [28]:
df_2018_8 = df_chengdu_daily.filter('year==2018 and month==8')
df_2019_1 = df_chengdu_daily.filter('year==2019 and month==1')
df_2019_2 =  df_chengdu_daily.filter('year==2019 and month==2')
df_2019_6 =  df_chengdu_daily.filter('year==2019 and month==6')

In [29]:
data = []
time = ['2018-08','2019-01','2019-02','2019-06']
data.append(np.round(df_2018_8.toPandas().drop(columns=['year','month']).values[0],2))
data.append(np.round(df_2019_1.toPandas().drop(columns=['year','month']).values[0],2))
data.append(np.round(df_2019_2.toPandas().drop(columns=['year','month']).values[0],2))
data.append(np.round(df_2019_6.toPandas().drop(columns=['year','month']).values[0],2))

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


In [337]:
from pyecharts import options as opts
from pyecharts.charts import Pie, Timeline
from pyecharts.faker import Faker

attr = Faker.choose()
tl = Timeline()

for i in range(4):
    pie = (
        Pie()
        .add(
            "日平均浓度",
            [list(z) for z in zip(pollutant, data[i])],
            rosetype="radius",
            radius=["30%", "55%"],
            
        )
        .set_global_opts(title_opts=opts.TitleOpts(time[i]+' 成都市空气污染物'),
                        legend_opts=opts.LegendOpts(pos_left="35%")
                        )
    )
    tl.add(pie,time[i])
tl.render("timeline_pie.html")

'/home/hadoop/bigdata/timeline_pie.html'

## 全国污染情况

In [38]:
df_nation = df.filter("AQI<=500 and city!='青岛'").groupby('city','year','month').agg({"AQI":"mean"})

df_aug = df_nation.filter('year==2018 and month==8').toPandas()
df_jan = df_nation.filter('year==2019 and month==1').toPandas()
df_feb = df_nation.filter('year==2019 and month==2').toPandas()
df_june = df_nation.filter('year==2019 and month==6').toPandas()

In [39]:
province={"成都":"四川",
         "海口":"海南",
          "天津":"天津",
          "厦门":"福建",
          "呼和浩特":"内蒙古",
          "青岛":"山东",
          "昆明":"云南",
          "郑州":"河南",
          "乌鲁木齐":"新疆",
          "济南":"山东",
          "北京":"北京",
          "上海":"上海"
         }
aqis=[]

aqis.append(np.round(df_aug['avg(AQI)'].values.tolist(),2))
aqis.append(np.round(df_jan['avg(AQI)'].values.tolist(),2))
aqis.append(np.round(df_feb['avg(AQI)'].values.tolist(),2))
aqis.append(np.round(df_june['avg(AQI)'].values.tolist(),2))

provinces = []

citys=[]
citys.append(df_aug['city'].values.tolist())
citys.append(df_jan['city'].values.tolist())
citys.append(df_feb['city'].values.tolist())
citys.append(df_june['city'].values.tolist())

for j in citys:
    tmp = []
    for i in j:
        if i in province.keys():
            tmp.append(province[i])
    provinces.append(tmp)

In [40]:
tl = Timeline()
time = ['2018-08','2019-01','2019-02','2019-06']
for i in range(4):
    map0 = (
       Map()
        .add(
            "",
            [list(z) for z in zip(provinces[i],aqis[i])],
            "china",
            label_opts=opts.LabelOpts(is_show=False),
        )
        .set_global_opts(
            title_opts=opts.TitleOpts(title=time[i]+"全国空气质量情况"),
            visualmap_opts=opts.VisualMapOpts()
        )
    )
    tl.add(map0, time[i])
tl.render("timeline_map.html")

'/home/hadoop/bigdata/timeline_map.html'