In [37]:
# Which states have more/less monitors? (Rank states!)
# para responder é necesário contar todas as posições diferenciadas (lat long) para cada estado
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('epa_hap_daily_summary-small.csv') #load txt
    header=lines.first()
    l2 = lines.filter( lambda line : line !=header) #filtra apenas linhas maiores que 0
    l3 = l2.map( lambda line : line.split(',') ) # faz o split das linhas num array
    logRows = l3.map( lambda arr : Row( state = arr[24], mon_pos = arr[5]+arr[6])) #map rdd com colonas nomeadas 
    DFsql = spark.createDataFrame( logRows ) # transforma rdd em dataframe
    query_1=DFsql.groupBy(col('state')).agg(countDistinct(col('mon_pos')).alias('monitors_number')).orderBy(col('monitors_number'),ascending=False)
    query_1.show()
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+--------------+---------------+
|         state|monitors_number|
+--------------+---------------+
|    California|            170|
|         Texas|            133|
|     Minnesota|             94|
|      Michigan|             92|
|          Ohio|             91|
|      New York|             67|
|South Carolina|             64|
|       Montana|             62|
|  Pennsylvania|             61|
|       Florida|             55|
|       Indiana|             52|
|      Colorado|             51|
|North Carolina|             50|
|      Illinois|             49|
|    Washington|             43|
|     Louisiana|             41|
|       Arizona|             38|
|        Kansas|             37|
|       Georgia|             35|
|      Kentucky|             34|
+--------------+---------------+
only showing top 20 rows

process time, real time (0.2233009999999993, 7.068371300119907)


In [38]:
# Which states have more/less monitors? (Rank states!)
# para responder é necesário contar todas as posições diferenciadas (lat long) para cada estado
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    main_data=spark.read.option('header','true').csv('epa_hap_daily_summary-small.csv')
    query_1=main_data.groupBy(col('state_name')).agg(countDistinct(col('latitude'),col('longitude')).alias('monitors_number')).orderBy(col('monitors_number'),ascending=False)
    query_1.show()
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+--------------+---------------+
|    state_name|monitors_number|
+--------------+---------------+
|    California|            170|
|         Texas|            133|
|     Minnesota|             94|
|      Michigan|             92|
|          Ohio|             91|
|      New York|             67|
|South Carolina|             64|
|       Montana|             62|
|  Pennsylvania|             61|
|       Florida|             55|
|       Indiana|             52|
|      Colorado|             51|
|North Carolina|             50|
|      Illinois|             49|
|    Washington|             43|
|     Louisiana|             41|
|       Arizona|             38|
|        Kansas|             37|
|       Georgia|             35|
|      Kentucky|             34|
+--------------+---------------+
only showing top 20 rows

process time, real time (0.06992939999999948, 3.791043800069019)


In [39]:
#Which counties have the best/worst air quality? (Rank counties considering pollutants’level
#linha indice 16 media de valores por dia soma(media diarias)/numero de dias 
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext
try :
    lines = sc.textFile('epa_hap_daily_summary-small.csv') #load txt
    header=lines.first()
    l2 = lines.filter( lambda line : line !=header) #filtra apenas linhas maiores que 0
    l3 = l2.map( lambda line : line.split(',') ) # faz o split das linhas num array
    logRows = l3.map( lambda arr : Row( counties = arr[25], media_pol = arr[16])) #map rdd com colunas nomeadas
    DFsql = spark.createDataFrame( logRows ) # transforma rdd em dataframe
    query_2=DFsql.groupBy(col('counties')).agg(avg(col("media_pol")).alias('poluicao_media')).orderBy(col('poluicao_media'),asc=False)
    query_2.show(100)
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+-------------------+--------------------+
|           counties|      poluicao_media|
+-------------------+--------------------+
|        Sweet Grass|                 0.0|
|             Martin|                 0.0|
|Wrangell Petersburg|  4.5359477124183E-5|
|   Northwest Arctic|6.333333333333333E-5|
|     Aleutians East|1.095714285714285...|
|              Eagle|1.162790697674418...|
|  Matanuska-Susitna|1.275885558583106...|
|    Kenai Peninsula|1.401041666666666...|
|      Yukon-Koyukuk|1.538461538461538...|
|              Lewis|1.573839662447257...|
|         Rio Blanco|1.597222222222222...|
|               Maui|1.712870012870013...|
|             Hawaii|1.738858195211786...|
|          Josephine|1.778510638297872...|
|             Denali|1.861187845303868E-4|
|           Okanogan|1.986374133949192E-4|
|           Siskiyou|2.145333333333333...|
|          Del Norte|2.147999999999999...|
|             Powell|2.212528473804101E-4|
|           Sublette|2.310253583241455...|
|          

In [40]:
#Q.3) Which states have the best/worst air quality in each year? (Rank states per year considering pollutants’ level!
# linha 12 contem data
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('epa_hap_daily_summary-small.csv') #load txt
    header=lines.first()
    l2 = lines.filter( lambda line : line !=header) #filtra apenas linhas maiores que 0
    l3 = l2.map( lambda line : line.split(',') ) # faz o split das linhas num array
    logRows = l3.map( lambda arr : Row( states = arr[24],data=arr[11], media_pol = arr[16])) #map rdd com colonas nomeadas 
    logRowsDF = spark.createDataFrame( logRows ) # transforma rdd em dataframe
    list_ipsDF= logRowsDF.groupBy(substring(col("data"),0,4).alias('ano'),col('states')).agg((sum(col("media_pol"))/count(col("media_pol"))).alias("poluicao media")).orderBy(col('ano'),col('poluicao media'))
    # pandas cria grafico ####################### desbloquear para gerar grafico
    '''df = list_ipsDF.toPandas()
    lista=['Colorado','Iowa','Michigan','Indiana','California']
    df1 = df[df['states'].isin(lista)]
    plt.figure(figsize=(10, 8))
    sns.lineplot(x='ano', y='poluicao media', hue='states', data=df1)
    plt.yscale('log')
    plt.legend(title='country', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.savefig('piores_estados.png')'''
    #############################################
    list_ipsDF.show(100)
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as inst:
    print(inst)
    sc.stop()

+----+--------------------+--------------------+
| ano|              states|      poluicao media|
+----+--------------------+--------------------+
|1990|      Virgin Islands|                 0.0|
|1990|           Wisconsin|                 0.0|
|1990|            Oklahoma|                 0.0|
|1990|       West Virginia|                 0.0|
|1990|              Hawaii|1.970370370370370...|
|1990|              Nevada|4.208000000000000...|
|1990|              Alaska|4.420833333333333...|
|1990|        South Dakota|            5.705E-4|
|1990|          Washington|5.974999999999999E-4|
|1990|             Wyoming|6.045454545454545E-4|
|1990|                Utah|7.970588235294118E-4|
|1990|          New Mexico|8.222222222222222E-4|
|1990|              Oregon|8.596296296296297E-4|
|1990|             Arizona|8.620134228187919E-4|
|1990|               Maine|9.789285714285713E-4|
|1990|            Colorado|0.002162374100719...|
|1990|         Mississippi|0.002666666666666...|
|1990|            Mi

In [41]:
#Q.4) For each state, what is the average distance (in km) of the monitors in that state to the state center? 
#For simplicity, assume that 1 degree of latitude or logitude equals to 111 km. (Monitor dispersion per state!)
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import math
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    main_data=spark.read.option('header','true').csv('epa_hap_daily_summary-small.csv') #data principal
    aux_data=spark.read.option('header','true').csv('usa_states.csv').select(col('Name'), col('MinLat'),col('MaxLat'),col('MinLon'),col('MaxLon')) #data auxiliar
    state_center=aux_data.withColumn('lat_center',(col('MinLat')+col('MaxLat'))/2).withColumn('lon_center',(col('MinLon')+col('MaxLon'))/2).select(col('Name'),col('lat_center'),col('lon_center'))
    df1=main_data.select(col('state_name'),col('latitude'),col('longitude')).distinct()
    df_joint=df1.join(state_center, df1.state_name ==  state_center.Name,"inner").withColumn('distance',(((col('latitude')-col('lat_center'))*111)**2 + ((col('longitude')-col('lon_center'))*111)**2)**(1/2))
    query=df_joint.groupBy(col('state_name')).agg(avg(col('distance')).alias('distance')).select(col('state_name'),col('distance')).orderBy(col('distance'),ascending=False)
    query.show(500)
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as inst:
    print(inst)
    sc.stop()

+--------------------+------------------+
|          state_name|          distance|
+--------------------+------------------+
|            Virginia| 715.4329730260437|
|              Alaska| 603.6996422410685|
|               Texas|  512.183989163014|
|             Vermont|504.06323548508084|
|            Illinois|440.85401432128594|
|        South Dakota|365.84513743232617|
|             Florida| 336.5449146570832|
|          California|328.22638131553657|
|            Michigan| 326.4116064851129|
|              Nevada|326.28118071973904|
|            Nebraska| 307.1411826055287|
|              Kansas| 292.0796841296708|
|               Idaho|289.63507327563104|
|             Montana| 286.8383352756942|
|            New York| 283.7273398637172|
|             Wyoming| 283.6405863374747|
|              Oregon| 268.8538079232644|
|        Pennsylvania|251.41517634057166|
|        North Dakota|  248.421930732627|
|            Oklahoma|236.88257437298168|
|           Tennessee|235.97614951

In [42]:
#Q.5) How many sensors there are per quadrant (NW, NE, SE, SW) in each state? To answer this question, you should approximate each state’s area to a rectangle as
#defined in the file “usa_satates.csv”, and divide that area in 4 quadrants (NW,NE, SE, SW). (Count monitors per sate qudrant! 
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import process_time,perf_counter
t_start,r_start=process_time(),perf_counter()
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    main_data=spark.read.option('header','true').csv('epa_hap_daily_summary-small.csv') #data principal
    aux_data=spark.read.option('header','true').csv('usa_states.csv').select(col('Name'), col('MinLat'),col('MaxLat'),col('MinLon'),col('MaxLon')) #data auxiliar
    state_center=aux_data.withColumn('lat_center',(col('MinLat')+col('MaxLat'))/2).withColumn('lon_center',(col('MinLon')+col('MaxLon'))/2).select(col('Name'),col('lat_center'),col('lon_center'))
    df1=main_data.select(col('state_name'),col('latitude'),col('longitude')).distinct()
    df_joint=df1.join(state_center, df1.state_name ==  state_center.Name,"inner")
    df_w=df_joint.withColumn('NE',when((col('latitude')>col('lat_center')) & (col('longitude')>col('lon_center')),1).otherwise(0)).withColumn('SE',when((col('latitude')<col('lat_center')) & (col('longitude')>col('lon_center')),1).otherwise(0)).withColumn('SW',when((col('latitude')<col('lat_center')) & (col('longitude')<col('lon_center')),1).otherwise(0)).withColumn('NW',when((col('latitude')>col('lat_center')) & (col('longitude')<col('lon_center')),1).otherwise(0))
    query=df_w.select(col('Name'),col('NE'),col('SE'),col('SW'),col('NW')).groupBy(col('Name')).agg(sum(col('NE')),sum(col('SE')),sum(col('SW')),sum(col('NW')))
    query.show(50)
    t_stop,r_stop=process_time(),perf_counter()
    print(f'process time, real time {t_stop-t_start,r_stop-r_start}')
    sc.stop()
except Exception as inst:
    print(inst)
    sc.stop()

+--------------------+-------+-------+-------+-------+
|                Name|sum(NE)|sum(SE)|sum(SW)|sum(NW)|
+--------------------+-------+-------+-------+-------+
|                Utah|      0|      3|      3|      6|
|              Hawaii|      2|      1|      0|      2|
|           Minnesota|     11|     50|     21|     12|
|                Ohio|     30|     10|     36|     15|
|              Oregon|      3|      1|     13|     15|
|            Arkansas|      2|      1|      5|      3|
|District Of Columbia|      0|      1|      3|      1|
|               Texas|     34|     72|      3|     24|
|        North Dakota|      1|      1|      3|      2|
|        Pennsylvania|      3|     36|     18|      4|
|         Connecticut|      5|      0|      8|      2|
|             Vermont|      1|      1|      5|     16|
|            Nebraska|      1|      3|      0|      2|
|              Nevada|      2|      5|      0|      2|
|         Puerto Rico|      4|      0|      1|      1|
|         