# Project - SparkDF

Analysis of large datasets is being performed at
an unprecedented frequency. Several technologies have been
developed to do so, offering a variety of solutions and drawbacks
related to the processing of different data types and
data processing requirements. 

This notebook implements SparkDF in order to solve a series of questions by using a data set regarding air polution in the USA.
In the report, we compared the performance
of five different technologies – MapReduce, Spark RDD,
SparkDF, Spark SQL and Hive.

## Q.1) Which states have more/less monitors? (Rank states!)

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

spark = SparkSession.builder.master('local[*]').appName('assig').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('./epa_hap_daily_summary-small.csv')
    l2 = lines.filter( lambda line : len(line) > 0 )
    header = l2.first()
    txt = l2.filter(lambda line: line != header)
    l3 = txt.map( lambda line : line.split(',') )
    logRows = l3.map( lambda arr : Row( state = arr[24], coords = arr[5]+arr[6]))

    logRowsDF = spark.createDataFrame( logRows )
    
    stateadd = logRowsDF.groupBy('state').agg(countDistinct('coords').alias('count'))
    sasorted= stateadd.sort(col("count").desc())
    
    sasorted.show(100)
except Exception as e:
    print(e)
sc.stop()


+--------------------+-----+
|               state|count|
+--------------------+-----+
|          California|  170|
|               Texas|  133|
|           Minnesota|   94|
|            Michigan|   92|
|                Ohio|   91|
|            New York|   67|
|      South Carolina|   64|
|             Montana|   62|
|        Pennsylvania|   61|
|             Florida|   55|
|             Indiana|   52|
|            Colorado|   51|
|      North Carolina|   50|
|            Illinois|   49|
|          Washington|   43|
|           Louisiana|   41|
|             Arizona|   38|
|              Kansas|   37|
|             Georgia|   35|
|            Kentucky|   34|
|              Oregon|   32|
|             Alabama|   31|
|           Tennessee|   29|
|           Wisconsin|   26|
|          New Jersey|   24|
|             Vermont|   23|
|            Oklahoma|   22|
|         Mississippi|   21|
|               Maine|   21|
|       Massachusetts|   19|
|            Virginia|   19|
|   Country Of

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Q.2) Which counties have the best/worst air quality? (Rank counties considering pollutants’ level!)

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[*]').appName('assig').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('./epa_hap_daily_summary-small.csv')
    l2 = lines.filter( lambda line : len(line) > 0 )
    header = l2.first()
    txt = l2.filter(lambda line: line != header)
    l3 = txt.map( lambda line : line.split(',') )
    logRows = l3.map( lambda arr : Row( county = arr[25], mean=arr[16]))
    logRowsDF = spark.createDataFrame( logRows )
    stateadd = logRowsDF.groupBy('county').agg(count('mean').alias('nrecords'), sum('mean').alias('totalmean'))
    new_df = stateadd.withColumn("meanOfmeans", (F.col("totalmean") / F.col("nrecords")))
    sasorted= new_df.sort(col("meanOfmeans").desc())
    final= sasorted.drop('nrecords', 'totalmean')
    final.show(100)
except Exception as e:
    print(e)
sc.stop()


+--------------------+------------------+
|              county|       meanOfmeans|
+--------------------+------------------+
|              Tipton|            2556.0|
|              Nassau|              19.0|
|          Columbiana| 7.385690735785953|
|     CHIHUAHUA STATE|         4.5121875|
|            Caldwell| 4.116666666666667|
|              Madera|            3.7393|
|             Oakland| 2.888877848101266|
|               Duval|2.7794603978494625|
|              Kearny|2.3753333333333333|
|               Bucks|2.3674999999999997|
|     San Luis Obispo|2.3333333333333335|
|           Edgecombe|             2.325|
|              Pawnee|2.2941176470588234|
|         Westchester|          2.239375|
|            Johnston|             2.225|
|            Hartford|2.0787055896226416|
|           Granville|2.0285714285714285|
|              Asotin|             2.025|
|              Duplin|               2.0|
|             Boulder| 1.960470901234568|
|          Crittenden|1.9000000000

## Q.3) Which states have the best/worst air quality in each year? (Rank states per year
considering pollutants’ level!)


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[*]').appName('assig').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('./epa_hap_daily_summary-small.csv')
    l2 = lines.filter( lambda line : len(line) > 0 )
    header = l2.first()
    txt = l2.filter(lambda line: line != header)
    l3 = txt.map( lambda line : line.split(',') )
    logRows = l3.map( lambda arr : Row( state = arr[24], mean=arr[16], year=arr[11].split('-')[0]))
    logRowsDF = spark.createDataFrame( logRows )
    stateadd = logRowsDF.select('state', 'year', 'mean').groupBy('state', 'year').agg(count('mean').alias('nrecords'), sum('mean').alias('totalmean'))
    new_df = stateadd.withColumn("meanOfmeans", (F.col("totalmean") / F.col("nrecords")))
    sasorted= new_df.sort(col("meanOfmeans").desc())
    final= sasorted.drop('nrecords', 'totalmean')
    final.show(100)
except Exception as e:
    print(e)
sc.stop()


+--------------------+----+------------------+
|               state|year|       meanOfmeans|
+--------------------+----+------------------+
|           Tennessee|1990|170.40093066666665|
|   Country Of Mexico|1995|              8.46|
|            Michigan|2001| 4.506138716367713|
|       Massachusetts|1993| 4.305833285714285|
|            Colorado|2017|4.2250000000000005|
|             Indiana|1990| 4.098978378378379|
|            Illinois|1992| 3.911825163398692|
|       Massachusetts|1994|3.4609906122448977|
|           Louisiana|1995| 3.364348865853659|
|        Rhode Island|1994|3.3635714000000005|
|             Alabama|1996| 3.226314057971015|
|         Connecticut|1993|3.0975461538461535|
|       Massachusetts|1990|3.0246823529411766|
|           Wisconsin|1994|2.9504833333333336|
|             Indiana|1993|2.8972258064516128|
|        Rhode Island|1995|2.7313043478260868|
|            Delaware|1993|          2.723077|
|             Indiana|1992|2.6606363636363635|
|        Penn

## Q.4) For each state, what is the average distance (in km) of the monitors in that state to
the state center? For simplicity, assume that 1 degree of latitude or logitude equals
to 111 km. (Monitor dispersion per state!)


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[*]').appName('assig').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('./epa_hap_daily_summary-small.csv')
    l2 = lines.filter( lambda line : len(line) > 0 )
    header = l2.first()
    txt = l2.filter(lambda line: line != header)
    l3 = txt.map( lambda line : line.split(',') )
    
    #states info
    states= sc.textFile('./usa_states.csv')
    statesfil = states.filter( lambda line : len(line) > 0 )
    header = statesfil.first()
    txt = statesfil.filter(lambda line: line != header)
    statesmap = txt.map( lambda line : line.split(',') )
    stateRows= statesmap.map( lambda arr : Row( statename = arr[1], minlat=arr[2], maxlat=arr[3], minlon=arr[4], maxlon=arr[5]))
    statesDF = spark.createDataFrame( stateRows )
    #statesDF.show(10)
    #ETL states
    new_df2 = statesDF.withColumn("meanlat", F.col("minlat") + abs(F.col("maxlat") - F.col("minlat"))/2)
    new_df = new_df2.withColumn("meanlon", F.col("minlon")+((F.col("maxlon") - F.col("minlon"))/2))
    #new_df.show(100)
    
    #evry address info
    logRows = l3.map( lambda arr : Row(sn=arr[24], lat=arr[5], lon=arr[6]))
    logRowsDF = spark.createDataFrame( logRows )
    logd= logRowsDF.distinct()
    
    #logd.show()

    
    #join
    ndf = logd.join(new_df, [logd.sn==new_df.statename], 'inner')
    ndf3= ndf.drop( 'minlat', 'minlon','maxlat', 'maxlon')

    #ETL join
    ndf1 = ndf3.withColumn("dlat", abs(F.col("lat") - F.col("meanlat")))
    ndf2 = ndf1.withColumn("dlon", abs(F.col("lon") - F.col("meanlon")))
    hip = ndf2.withColumn("h", sqrt(abs(F.col("dlat"))**2 + abs(F.col("dlon")**2)))
    ndf3= hip.drop( 'lat', 'lon','meanlat', 'meanlon')
    ndf5= hip.sort(col('h').desc())
    ndf6=ndf5.filter(col('lat')!=0)
    nhdp = ndf6.withColumn("hkm", F.col('h')*111)
    ndf7=nhdp.drop('lat', 'lon', 'sn', 'meanlat', 'meanlon', 'dlon', 'dlat', 'h')
    ndf4= ndf7.groupBy( 'statename').agg({'hkm': 'avg'}).sort(col('statename'))
    ndf4.show()

except Exception as e:
    print(e)
sc.stop()


+-----------+------------------+
|  statename|          avg(hkm)|
+-----------+------------------+
|    Alabama|167.57779614651386|
|     Alaska| 603.6996422410684|
|    Arizona|178.87574055612177|
|   Arkansas|157.85010472330006|
| California| 328.2263813155365|
|   Colorado|180.25974203315081|
|Connecticut|  49.9897454877856|
|   Delaware|  51.5797704802335|
|    Florida| 336.5449146570832|
|    Georgia|184.34823507347105|
|     Hawaii|155.73279515048372|
|      Idaho|  289.635073275631|
|   Illinois| 224.0676150694105|
|    Indiana|177.74195777189746|
|       Iowa| 206.5989410368908|
|     Kansas| 292.0796841296708|
|   Kentucky| 219.9515168075211|
|  Louisiana|173.27631230948307|
|      Maine| 167.7714354141443|
|   Maryland| 89.28759445559284|
+-----------+------------------+
only showing top 20 rows



## Q.5) How many sensors there are per quadrant (NW, NE, SE, SW) in each state? 

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[*]').appName('assig').getOrCreate()
sc = spark.sparkContext

try :
    #states info
    states= sc.textFile('./usa_states.csv')
    statesfil = states.filter( lambda line : len(line) > 0 )
    header = statesfil.first()
    txt = statesfil.filter(lambda line: line != header)
    statesmap = txt.map( lambda line : line.split(',') )
    stateRows= statesmap.map( lambda arr : Row( statename = arr[1], minlat=arr[2], maxlat=arr[3], minlon=arr[4], maxlon=arr[5]))
    statesDF = spark.createDataFrame( stateRows )
    #ETL states
    new_df2 = statesDF.withColumn("meanlat", F.col("minlat") + abs(F.col("maxlat") - F.col("minlat"))/2)
    new_df = new_df2.withColumn("meanlon", F.col("minlon")+((F.col("maxlon") - F.col("minlon"))/2))
    #new_df.show(100)
    
    
    #evry address info
    lines = sc.textFile('./epa_hap_daily_summary-small.csv')
    l2 = lines.filter( lambda line : len(line) > 0 )
    header = l2.first()
    txt = l2.filter(lambda line: line != header)
    l3 = txt.map( lambda line : line.split(',') )
    logRows = l3.map( lambda arr : Row(sc = arr[0], sn=arr[24], coords = arr[5]+arr[6], lat=arr[5], lon=arr[6]))
    logRowsDF = spark.createDataFrame( logRows )
    logd= logRowsDF.distinct()
    aux4= logd.groupBy('sn').agg(count('coords').alias('countState'))
    #logd.show(10)
    
    
    #join
    ne = logd.join(new_df, [logd.sn==new_df.statename, logd.lat>=new_df.meanlat, logd.lon>=new_df.meanlon], 'inner')
    negb= ne.drop( 'minlat', 'minlon','maxlat', 'maxlon')
    #negb.show(10)
    nef= negb.groupBy('sn').agg(countDistinct('coords').alias('countNE'))
    
    nw = logd.join(new_df, [logd.sn==new_df.statename, logd.lat>=new_df.meanlat, logd.lon<=new_df.meanlon], 'inner')
    nwgb= nw.drop( 'minlat', 'minlon','maxlat', 'maxlon')
    #nwgb.show(10)
    nwf= nwgb.groupBy('sn').agg(countDistinct('coords').alias('countNW'))
    
    se = logd.join(new_df, [logd.sn==new_df.statename, logd.lat<=new_df.meanlat, logd.lon>=new_df.meanlon], 'inner')
    segb= se.drop( 'minlat', 'minlon','maxlat', 'maxlon')
    #segb.show(10)
    sef= segb.groupBy('sn').agg(countDistinct('coords').alias('countSE'))
     
    sw = logd.join(new_df, [logd.sn==new_df.statename, logd.lat<=new_df.meanlat, logd.lon<=new_df.meanlon], 'inner')
    swgb= sw.drop( 'minlat', 'minlon','maxlat', 'maxlon')
    #swgb.show(10)
    swf= swgb.groupBy('sn').agg(countDistinct('coords').alias('countSW'))
 
    #join2
    aux1 = nef.join(nwf, [nef.sn==nwf.sn], 'inner').drop(nwf.sn)
    aux2= aux1.join(sef, [aux1.sn==sef.sn], 'inner').drop(sef.sn)
    aux3= aux2.join(swf, [aux2.sn==swf.sn], 'inner').drop(swf.sn)
    aux5= aux3.join(aux4, aux4.sn==aux3.sn, 'inner').drop(aux4.sn)
    table= aux5.withColumn("calcAllcount", F.col("countNE")+F.col("countNW")+F.col("countSE")+F.col("countSW"))
    table_sorted = table.sort(col("sn"))
    table_sorted.show(10)
    
except Exception as e:
    print(e)
sc.stop()


+-------+----------+-------+-------+-------+----------+------------+
|countNE|        sn|countNW|countSE|countSW|countState|calcAllcount|
+-------+----------+-------+-------+-------+----------+------------+
|      5|   Alabama|     14|      5|      7|        31|          31|
|      4|    Alaska|      3|      2|      3|        12|          12|
|      2|   Arizona|     10|     16|     10|        38|          38|
|      2|  Arkansas|      3|      1|      5|        11|          11|
|      2|California|     84|     68|     16|       170|         170|
|     25|  Colorado|     17|      4|      5|        51|          51|
|      4|   Georgia|     21|      5|      5|        35|          35|
|     32|  Illinois|      2|      1|     14|        49|          49|
|     18|   Indiana|     18|      8|      8|        52|          52|
|     18|    Kansas|      2|      9|      8|        37|          37|
+-------+----------+-------+-------+-------+----------+------------+
only showing top 10 rows

