In [1]:
import socket
ipaddress=socket.gethostbyname(socket.gethostname())
print (ipaddress)

172.30.2.5


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master('spark://'+ ipaddress + ':7077').\
config("spark.driver.host", ipaddress).getOrCreate()

In [4]:
spark

In [None]:
smog_df = spark.read.json("s3a://openaq-fetches/realtime/2018*")

In [6]:
smog_df.printSchema()

In [8]:
smog_df.count()

188198452

In [9]:
from pyspark.sql.functions import col, min, max, avg

In [10]:
# Flatten the dataframe
smog_df=smog_df.select("location", "city", "country","parameter", "unit", "value",\
                col("date.local").alias("date_local"),\
                col("date.utc").alias("date_utc"),\
                "coordinates.*", "averagingPeriod", "sourceName", "sourceType", "mobile")

In [11]:
smog_df.printSchema()

root
 |-- location: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- date_local: string (nullable = true)
 |-- date_utc: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- averagingPeriod: struct (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- value: double (nullable = true)
 |-- sourceName: string (nullable = true)
 |-- sourceType: string (nullable = true)
 |-- mobile: boolean (nullable = true)



# Znaleźć miasto, w którym najniższe zanieczyszczenie w danym okresie jest największe spośród wszystkich miast

In [12]:
min_pm25=smog_df.filter(col('parameter')=='pm25').filter(col('value')>0.0).\
                 groupBy('city').agg(min('value').alias('minpm25')).sort(col('minpm25').desc())

In [13]:
display(min_pm25.limit(100).toPandas())

Unnamed: 0,city,minpm25
0,Nueva Libertad,38.08000
1,Consultorio - San Vicente,23.78000
2,039,20.00000
3,Hapur,17.00000
4,"INIA, Chillán",16.44000
5,Punteras,16.30000
6,Evanston,14.60000
7,Liceo Polivalente,13.00000
8,Bathinda,12.50000
9,Padre Las Casas II,11.00000


# Ranking miast pod względem liczby dni w roku, w których poziom jest wyższy niż X

In [14]:
# dodać kolumnę z datą dniową na podstawie 'date_local'
from pyspark.sql.functions import date_format, to_date, countDistinct
smog_df=smog_df.withColumn("date3", date_format(to_date(col("date_local"), "yyyy-MM-dd"), "yyyy-MM-dd"))

In [15]:
x=100.0
higherX=smog_df.filter(col('parameter')=='pm10').filter(col('value')>x)

In [16]:
ranking=higherX.groupBy('city','country').agg(countDistinct('date3').alias('daycount'))

In [17]:
ranking.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- daycount: long (nullable = false)



In [18]:
ranking.sort(col('daycount').desc()).limit(100).toPandas()

Unnamed: 0,city,country,daycount
0,Ulaanbaatar,MN,292
1,Phoenix-Mesa-Scottsdale,US,291
2,GUANAJUATO,MX,278
3,PINAL,US,264
4,Kahramanmaraş,TR,256
5,El Centro,US,244
6,Adana,TR,240
7,Huainan,CN,238
8,Lima,PE,236
9,Saraburi,TH,232


In [19]:
min_pm25=smog_df.filter(col('parameter')=='pm25').\
                    filter(col('value')>0.0).filter(col('longitude')<50.0).\
                    groupBy('city','country').agg(min('value').alias('minpm25')).sort(col('minpm25').desc())

In [20]:
display(min_pm25.limit(50).toPandas())

Unnamed: 0,city,country,minpm25
0,Nueva Libertad,CL,38.08
1,Consultorio - San Vicente,CL,23.78
2,039,US,20.0
3,"INIA, Chillán",CL,16.44
4,Punteras,CL,16.3
5,Evanston,US,14.6
6,Liceo Polivalente,CL,13.0
7,Padre Las Casas II,CL,11.0
8,U.C. Maule,CL,10.0
9,Parque O'Higgins,CL,10.0


In [21]:
min_pm10=smog_df.filter(col('parameter')=='pm10').\
                    filter(col('value')>0.0).filter(col('longitude')<50.0).\
                    groupBy('city','country').agg(min('value').alias('minpm10')).sort(col('minpm10').desc())

In [22]:
display(min_pm10.limit(50).toPandas())

Unnamed: 0,city,country,minpm10
0,Chaiten Norte,CL,999.83
1,Punteras,CL,851.0
2,Coyhaique II,CL,265.0
3,Parque O'Higgins,CL,41.5
4,Ventanas,CL,39.42
5,Los Maitenes,CL,33.63
6,La Greda,CL,28.89
7,Padre Las Casas II,CL,22.0
8,"INIA, Chillán",CL,16.74
9,QUILICURA,CL,16.0


# Ranking krajów pod względem średniego zanieczyszczenia

In [23]:
avg_pm10=smog_df.filter(col('parameter')=='pm10').\
                    filter(col('value')>0.0).filter(col('longitude')<50.0).filter(col('longitude')>10.0).\
                    groupBy('country').agg(avg('value').alias('avgpm10')).sort(col('avgpm10').desc())

In [24]:
display(avg_pm10.limit(50).toPandas())

Unnamed: 0,country,avgpm10
0,MT,760.165687
1,HU,735.823406
2,TR,656.340383
3,MK,50.209658
4,BA,47.690126
5,PL,36.019816
6,RS,35.698248
7,ZA,28.204039
8,SI,27.99422
9,CZ,26.439316


# Średnie zaniczyszczenie w miastach na mapie

In [25]:
avg_pm10=smog_df.filter(col('parameter')=='pm10').\
                    filter(col('value')>0.0).\
                    filter(col('latitude')>0.0).filter(col('longitude')<50.0).filter(col('longitude')>10.0).\
                    groupBy('city','latitude','longitude').agg(avg('value').alias('avgpm10'))

In [None]:
pd_cities=avg_pm10.toPandas()

In [None]:
pd_cities

In [None]:
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')

In [None]:
pd_cities.plot(x="longitude",y="latitude",c="avgpm10", cmap='terrain',kind="scatter",figsize=(10, 10))

In [None]:
from mpl_toolkits.basemap import Basemap