## Q.1) Which states have more/less monitors? (Rank states!)

In [1]:
%%time
import pyspark
from operator import add as sum

sc = pyspark.SparkContext('local[*]')
try :
    lines = sc.textFile('epa_hap_daily_summary-small.csv')
    
    non_empty_lines = lines.filter( lambda line : len(line) > 0 )
    
    #filtrar o header
    header = non_empty_lines.first() #extract header
    non_empty_lines = non_empty_lines.filter(lambda row : row != header)   
    words = non_empty_lines.map( lambda line : line.split(','))

    #estado, linha
    stateLinha = words.map( lambda x : (x[0],{(x[5], x[6])}))
    nrStates = stateLinha.map(lambda x: x[0]).distinct()
    estadosComMonitores = stateLinha.reduceByKey(lambda a,b: a|b)
    
    #print(estadosComMonitores.collect())
    estadosOrdenados = estadosComMonitores.sortBy(lambda x: len(x[1]), ascending=False)
    estadosCount = estadosOrdenados.map(lambda x: (x[0], len(x[1])))
    print(estadosCount.collect())


    #so p verificar soma das linhas
    somaFinalAux = estadosCount.map(lambda x: x[1]).sum()
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

[('06', 170), ('48', 133), ('27', 94), ('26', 92), ('39', 91), ('36', 67), ('45', 64), ('30', 62), ('42', 61), ('12', 55), ('18', 52), ('08', 51), ('37', 50), ('17', 49), ('53', 43), ('22', 41), ('04', 38), ('20', 37), ('13', 35), ('21', 34), ('41', 32), ('01', 31), ('47', 29), ('55', 26), ('34', 24), ('50', 23), ('40', 22), ('23', 21), ('28', 21), ('51', 19), ('25', 19), ('80', 18), ('19', 18), ('29', 18), ('35', 18), ('16', 17), ('24', 17), ('33', 17), ('09', 15), ('44', 13), ('02', 12), ('49', 12), ('05', 11), ('54', 10), ('56', 9), ('32', 9), ('38', 7), ('46', 7), ('10', 6), ('31', 6), ('78', 6), ('72', 6), ('15', 5), ('11', 5)]
CPU times: user 1.7 s, sys: 730 ms, total: 2.43 s
Wall time: 28.3 s


## Q.2) Which counties have the best/worst air quality? (Rank counties considering pollutants’ level!)

In [2]:
%%time
import pyspark
import numpy as np

sc = pyspark.SparkContext('local[*]')

try :
    lines = sc.textFile('epa_hap_daily_summary-small.csv')
    
    non_empty_lines = lines.filter( lambda line : len(line) > 0 )    
    #print(non_empty_lines.take(2))
    header = non_empty_lines.first() #extract header
    #filtrar o header
    non_empty_lines = non_empty_lines.filter(lambda row : row != header) 

    features = non_empty_lines.map( lambda line : line.split(','))
    
    #county code vs average day
    linhaTuplo = features.map( lambda x : (x[0]+"|" + x[1], [float(x[16])]))
    averagesCounty = linhaTuplo.reduceByKey(lambda a,b: a+b)
 
    #TER EM CONTA AS DIFERENCAS ENTRE UNIDADES
    countyAvg = averagesCounty.map(lambda x: (x[0], np.sum(x[1])/len(x[1])))
    linhasSortedKey = countyAvg.sortBy(lambda x: (x[1]), ascending=False)
    print(linhasSortedKey.take(10))
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

[('47|167', 2556.0), ('36|059', 19.0), ('39|029', 7.3856907357859525), ('30|067', 5.61121212121212), ('80|006', 4.5121875), ('37|027', 4.116666666666667), ('06|031', 3.9843770491803276), ('06|039', 3.7393), ('37|069', 3.3499999999999996), ('08|059', 3.07)]
CPU times: user 413 ms, sys: 121 ms, total: 534 ms
Wall time: 10.2 s


# Q3 - Which states have the best/worst air quality in each year? (Rank states per year considering pollutants' levels!)

In [3]:
#%%time
import pyspark
import numpy as np
import math



sc = pyspark.SparkContext('local[*]')
try :    
    epa_daily = sc.textFile('epa_hap_daily_summary-small.csv')


    non_empty_lines_epa_daily = epa_daily.filter( lambda line : len(line) > 0 )

    header_epa_daily = non_empty_lines_epa_daily.first() #extract header


    non_empty_lines_epa_daily = non_empty_lines_epa_daily.filter(lambda row : row != header_epa_daily)  


    epa_daily = non_empty_lines_epa_daily.map(lambda line: line.split(','))

    #cria tuplos (ano,estado,ar)
    epa_daily=epa_daily.map(lambda coord: (coord[11].split('-')[0], coord[24],[float(coord[16])]))

    # fixa o ano e o estado e soma as qualidades de ar
    epa_daily=epa_daily.flatMap(lambda x : [ ((x[0],x[1]),x[2]) ]).reduceByKey(lambda a,b : (a+b))
    
    #média das qualidades de ar
    epa_daily=epa_daily.map(lambda x : (x[0],np.sum(x[1])/len(x[1])))
    
    #faz [ano,(estado1,ar1)] e ordena pela qual.ar. Cria uma lista para os valores
    epa_daily=epa_daily.map(lambda x : [x[0][0],(x[0][1],x[1])]).sortBy(lambda x: x[1][1]).groupByKey().mapValues(list)
    
    #ordena por ano
    epa_daily=epa_daily.sortByKey()

    for (k,v) in epa_daily.collect():
        print('Year ',k,': ')
        print(v,'\n')
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

Year  1990 : 
[('West Virginia', 0.0), ('Oklahoma', 0.0), ('Wisconsin', 0.0), ('Virgin Islands', 0.0), ('Hawaii', 0.00019703703703703704), ('Nevada', 0.0004208), ('Alaska', 0.00044208333333333334), ('South Dakota', 0.0005705), ('Washington', 0.0005974999999999999), ('Wyoming', 0.0006045454545454545), ('Utah', 0.0007970588235294118), ('New Mexico', 0.0008222222222222221), ('Oregon', 0.0008596296296296296), ('Arizona', 0.0008620134228187919), ('Maine', 0.0009789285714285713), ('Colorado', 0.0021623741007194244), ('Mississippi', 0.0026666666666666666), ('Missouri', 0.0056), ('Michigan', 0.006559896373056995), ('Connecticut', 0.0081), ('Georgia', 0.008366666666666666), ('Puerto Rico', 0.01005), ('North Carolina', 0.0143), ('Alabama', 0.024325), ('Iowa', 0.0332), ('Pennsylvania', 0.10596250000000002), ('Ohio', 0.135716), ('Illinois', 0.14575701219512194), ('New Jersey', 0.293335294117647), ('Minnesota', 0.3064879999999999), ('California', 0.41153099836333884), ('South Carolina', 0.559814035

# Q4 - For each state, what is the average distance (in km) of the monitors in that state to the state center? For simplicity, assume that 1 degree of latitude or logitude equals to 111 km. (Monitor dispersion per state!)

In [11]:
import pyspark
import math
import numpy as np

sc = pyspark.SparkContext('local[*]')
try :    
    epa_daily = sc.textFile('epa_hap_daily_summary-small.csv')
    usa_state = sc.textFile('usa_states.csv')

    non_empty_lines_epa_daily = epa_daily.filter( lambda line : len(line) > 0 )
    non_empty_lines_usa_state = usa_state.filter( lambda line : len(line) > 0 )

    header_epa_daily = non_empty_lines_epa_daily.first() #extract header
    header_usa_state = non_empty_lines_usa_state.first() #extract header



    non_empty_lines_epa_daily = non_empty_lines_epa_daily.filter(lambda row : row != header_epa_daily)  
    non_empty_lines_usa_state = non_empty_lines_usa_state.filter(lambda row : row != header_usa_state)  

    epa_daily = non_empty_lines_epa_daily.map(lambda line: line.split(','))
    usa_state = non_empty_lines_usa_state.map(lambda line: line.split(','))

    coordinates_monitor=epa_daily.map(lambda coord: (coord[24],(float(coord[5])*111,float(coord[6])*111)))
    coordinates_monitor=coordinates_monitor.distinct()

    coordinates_state=usa_state.map(lambda coord: (coord[1],((float(coord[2])*111+float(coord[3])*111)/2,(float(coord[4])*111+float(coord[5])*111)/2)))

    join_coordinates=coordinates_monitor.join(coordinates_state)
    distance_monitor=join_coordinates.map(lambda v1: (v1[0],[(math.sqrt((v1[1][0][0]-v1[1][1][0])**2+(v1[1][0][1]-v1[1][1][1])**2))]))#,((float(v1[0][1])-float(v1[1][1]))))) (v1[1][1][0]

    distance_monitor_state= distance_monitor.reduceByKey(lambda a,b: a+b)
    distance_monitor_state= distance_monitor_state.map(lambda x: (x[0], math.fsum(x[1])/len(x[1])))
    print(distance_monitor_state.sortBy(lambda x: x[1], ascending=False).collect())
    
    sc.stop()

except Exception as e:
    print(e)
    sc.stop()

[('Virginia', 715.4329730260437), ('Alaska', 603.6996422410685), ('Texas', 512.1839891630143), ('Vermont', 504.0632354850802), ('Illinois', 440.8540143212859), ('South Dakota', 365.84513743232594), ('Florida', 336.54491465708276), ('California', 328.22638131553674), ('Michigan', 326.4116064851137), ('Nevada', 326.28118071973887), ('Nebraska', 307.1411826055286), ('Kansas', 292.07968412967074), ('Idaho', 289.6350732756307), ('Montana', 286.8383352756939), ('New York', 283.72733986371713), ('Wyoming', 283.64058633747493), ('Oregon', 268.85380792326436), ('Pennsylvania', 251.41517634057158), ('North Dakota', 248.42193073262706), ('Oklahoma', 236.88257437298296), ('Tennessee', 235.97614951487589), ('Missouri', 234.32953412793935), ('Washington', 219.98044806799842), ('Kentucky', 219.9515168075211), ('Iowa', 206.59894103689075), ('Wisconsin', 202.84693007978447), ('Minnesota', 195.0682708282483), ('Utah', 184.91343040711567), ('Georgia', 184.34823507347093), ('New Mexico', 183.1891212852376

# Q5 - How many sensors there are per quadrant (NW, NE, SE, SW) in each state? To answer this question, you should approximate each state’s area to a rectangle as defined in the file “usa_satates.csv”, and divide that area in 4 quadrants (NW, NE, SE, SW). (Count monitors per sate qudrant!)

In [12]:
%%time
import pyspark
import numpy as np
from operator import add as sum

sc = pyspark.SparkContext('local[*]')
try :

    ########## Ter USA States com latitude e longitude medias ########## 

    usaStates = sc.textFile('usa_states.csv')
    
    non_empty_linesStates = usaStates.filter( lambda line : len(line) > 0 )    
    header = non_empty_linesStates.first() #extract header
    #filtrar o header
    non_empty_linesStates = non_empty_linesStates.filter(lambda row : row != header)  
    wordsUSA = non_empty_linesStates.map( lambda line : line.split(','))

    linhaUSA = wordsUSA.map( lambda x : (x[1], ( str((float(x[2])+float(x[3]))/2), str((float(x[4])+float(x[5]))/2))))
    
    #Construir dicionario com os estados e latitudes e longitudes medias
    dictUSA = dict()
    for i in linhaUSA.collect():
        dictUSA[i[0]] = (float(i[1][0]), float(i[1][1]))

    def compararCoordenadas(epaTuplo):
        if epaTuplo[0] in dictUSA:
            #percorrer todos os monitores para cada estado
            #monitor[0] = latitude
            #monitor[1] = longitude
            if (epaTuplo[1][0] <= float(dictUSA[epaTuplo[0]][0])) and (epaTuplo[1][1] <= float(dictUSA[epaTuplo[0]][1])):
                return (epaTuplo[0]+"NW",1)
            elif (epaTuplo[1][0] <= float(dictUSA[epaTuplo[0]][0])) and (epaTuplo[1][1] > float(dictUSA[epaTuplo[0]][1])):
                return (epaTuplo[0]+"SW",1)
            elif (epaTuplo[1][0] > float(dictUSA[epaTuplo[0]][0])) and (epaTuplo[1][1] <= float(dictUSA[epaTuplo[0]][1])):
                return (epaTuplo[0]+"NE",1)
            elif (epaTuplo[1][0] > float(dictUSA[epaTuplo[0]][0])) and (epaTuplo[1][1] > float(dictUSA[epaTuplo[0]][1])):
                return (epaTuplo[0]+"SE",1)
            else:
                return


    ########## Contar nr de monitores por quadrante de cada estado ########## 

    lines = sc.textFile('epa_hap_daily_summary-small.csv') 
    non_empty_lines = lines.filter( lambda line : len(line) > 0 )    
    header = non_empty_lines.first() #extract header
    #filtrar o header
    non_empty_lines = non_empty_lines.filter(lambda row : row != header) 
    #print(len(non_empty_lines.collect()))bem

    features = non_empty_lines.map( lambda line : line.split(','))
    #print(len(features.collect())) #da bem

    # #stateName vs latitudeMontior,longitudeMonitor
    #Remover monitores duplicados (state e coordenadas iguais)

    linhaTuplo = features.map( lambda x : (x[24], (float(x[5]), float(x[6])))).distinct()
    
    # percorrer os varios estados do linhaTuplo, ir buscar o estado correspondente ao linhaUSA, fazer comparacao de coordenadas
    linhaTuploComparar = linhaTuplo.map(compararCoordenadas).filter(lambda x: x != None)

    sumMonitorsQuadrant = linhaTuploComparar.reduceByKey(lambda a,b: a+b)
    #print(sumMonitorsQuadrant.collect())
    monitorsQuadrantSorted = sumMonitorsQuadrant.sortBy(lambda x: (x[1]), ascending=False)
    print(monitorsQuadrantSorted.collect())

    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

[('CaliforniaNE', 84), ('MichiganSW', 74), ('TexasSW', 72), ('CaliforniaSW', 68), ('MinnesotaSW', 50), ('New YorkSW', 43), ('MontanaNW', 36), ('OhioNW', 36), ('PennsylvaniaSW', 36), ('LouisianaSW', 35), ('TexasSE', 34), ('South CarolinaNE', 33), ('IllinoisSE', 32), ('OhioSE', 30), ('FloridaSE', 27), ('North CarolinaSE', 26), ('ColoradoSE', 25), ('TexasNE', 24), ('FloridaSW', 23), ('GeorgiaNE', 21), ('WisconsinSW', 21), ('MinnesotaNW', 21), ('WashingtonNE', 20), ('North CarolinaNE', 19), ('KansasSE', 18), ('OklahomaSE', 18), ('South CarolinaSW', 18), ('IndianaNE', 18), ('PennsylvaniaNW', 18), ('IndianaSE', 18), ('New JerseySE', 18), ('ColoradoNE', 17), ('KentuckyNW', 16), ('ArizonaSW', 16), ('New YorkNE', 16), ('MarylandSE', 16), ('VermontNE', 16), ('CaliforniaNW', 16), ('WashingtonNW', 15), ('OregonNE', 15), ('OhioNE', 15), ('IllinoisNW', 14), ('MichiganSE', 14), ('AlabamaNE', 14), ('KentuckySE', 13), ('OregonNW', 13), ('New MexicoNE', 12), ('Rhode IslandSE', 12), ('MississippiSW', 12)