# Final Project Spark DataFrames Notebook

### Small Data

### Make all scripts executable

In [1]:
!!chmod a+x ./*/*.py

["chmod: changing permissions of './P1/spark_df_p1.py': Operation not permitted",
 "chmod: changing permissions of './P2/spark_df_p2.py': Operation not permitted",
 "chmod: changing permissions of './P3/spark_df_p3.py': Operation not permitted",
 "chmod: changing permissions of './P4/spark_df_p4.py': Operation not permitted",
 "chmod: changing permissions of './P5/spark_df_p5.py': Operation not permitted"]

### Remove all Results

In [2]:
!rm -rf ./*/*.result

### P1

In [None]:
%load ./P1/spark_df_p1.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try:
    # files
    lines_states = sc.textFile('../data/epa_hap_daily_summary-small.csv')
    #lines = sc.textFile('log.csv')

    # file mapping
    logRows = lines_states.filter( lambda line : len(line) > 0)    \
                    .zipWithIndex() \
                    .filter( lambda x: x[1] > 0) \
                    .map(lambda x: x[0]) \
                    .map( lambda line: line.split(',')) \
                    .map( lambda arr : Row( state = arr[24], countyCode = arr[1], site_num = arr[2]))    
    
    # Dataframe creation
    logRowsDF = spark.createDataFrame( logRows )
    #logRowsDF = spark.createDataFrame( logRows )
    #logRowsDF = logRowsDF.distinct() # Makes sure we are using different monitors
    
    logRows2DF = logRowsDF.select('state','countyCode','site_num').distinct().groupBy('state')\
                                                                         .agg(count('site_num').alias('Nr of Monitors'))\
                                                                         .sort('Nr of Monitors', ascending = False)   
    logRows2DF.show(100,truncate=50)

    sc.stop()
except Exception as e:
    print(e)
    sc.stop()


In [4]:
!!time python ./P1/spark_df_p1.py > ./P1/p1.result

['21/12/23 23:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 'Setting default log level to "WARN".',
 'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 '',
 '[Stage 0:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 '',
 '[Stage 2:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 'real\t0m16.302s',
 'user\t0m1.146s',
 'sys\t0m1.010s']

In [5]:
!cat ./P1/p1.result

+--------------------+--------------+
|               state|Nr of Monitors|
+--------------------+--------------+
|          California|           162|
|               Texas|           132|
|           Minnesota|            94|
|                Ohio|            89|
|            Michigan|            84|
|            New York|            66|
|      South Carolina|            64|
|        Pennsylvania|            60|
|             Montana|            60|
|             Indiana|            52|
|            Colorado|            51|
|            Illinois|            50|
|             Florida|            50|
|      North Carolina|            49|
|          Washington|            42|
|           Louisiana|            40|
|             Arizona|            38|
|              Kansas|            37|
|             Georgia|            34|
|              Oregon|            31|
|            Kentucky|            30|
|             Alabama|            28|
|           Tennessee|   

### P2

In [None]:
%load ./P2/spark_df_p2.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try:
    lines = sc.textFile('../data/epa_hap_daily_summary-small.csv') # Change the name of the file to what you have it named here
    logRows = lines.filter( lambda line: len(line) > 0) \
                     .zipWithIndex() \
                     .filter( lambda x: x[1] > 0) \
                     .map(lambda x: x[0]) \
                     .map( lambda line: line.split(',')) \
                     .map( lambda arr : Row( county_name = arr[25], state_code = arr[0], county_code = arr[1], arithmetic_mean = float(arr[16])))
    logRowsDF = spark.createDataFrame( logRows )
    logRowsDF.createOrReplaceTempView("log")
    
    # Necessary computations to solve the problem
    finalDF = logRowsDF.withColumn('county', (col('state_code')+col('county_code'))) \
                    .drop('state_code') \
                    .drop('county_code') \
                    .groupBy('county','county_name').avg('arithmetic_mean') \
                    .withColumnRenamed('avg(arithmetic_mean)', 'pollutant_levels') \
                    .orderBy(col('pollutant_levels').desc()) \
                    .drop('county') \

    finalDF.show(20)
    sc.stop()
    
except Exception as err:
    print(err)
    sc.stop()

In [7]:
!!time python ./P2/spark_df_p2.py > ./P2/p2.result

['21/12/23 23:10:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 'Setting default log level to "WARN".',
 'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 '',
 '[Stage 0:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 '',
 '[Stage 2:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 'real\t0m17.304s',
 'user\t0m1.182s',
 'sys\t0m1.015s']

In [8]:
!cat ./P2/p2.result

+---------------+------------------+
|    county_name|  pollutant_levels|
+---------------+------------------+
|         Tipton|            2556.0|
|         Nassau|              19.0|
|     Columbiana| 7.385690735785953|
|           Park| 5.611212121212121|
|CHIHUAHUA STATE|         4.5121875|
|       Caldwell| 4.116666666666667|
|          Kings|3.9843770491803276|
|         Madera|            3.7393|
|       Franklin|3.3499999999999996|
|      Jefferson|              3.07|
|        Oakland| 2.888877848101266|
|           Lake| 2.879328647058823|
|          Duval|2.7794603978494625|
|      Middlesex|2.6500000000000004|
|         Kearny|2.3753333333333333|
|          Bucks|2.3674999999999997|
|San Luis Obispo|2.3333333333333335|
|      Edgecombe|             2.325|
|         Pawnee|2.2941176470588234|
|    Westchester|          2.239375|
+---------------+------------------+
only showing top 20 rows



### P3

In [None]:
%load ./P3/spark_df_p3.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try:
    lines = sc.textFile('../data/epa_hap_daily_summary-small.csv') # Change the name of the file to what you have it named here
    logRows = lines.filter( lambda line: len(line) > 0) \
                     .zipWithIndex() \
                     .filter( lambda x: x[1] > 0) \
                     .map(lambda x: x[0]) \
                     .map( lambda line: line.split(',')) \
                     .map( lambda arr : Row( Year = arr[11][:4], State = arr[24], Arithmetic_mean = float(arr[16])))
    
    logRowsDF = spark.createDataFrame( logRows )
    
    logRows2DF = logRowsDF.select('State','Year','Arithmetic_mean')\
                            .groupBy('Year','State')\
                            .avg('Arithmetic_mean').withColumnRenamed('avg(Arithmetic_mean)','Avg Pollutants')\
                            .orderBy(['Year','Avg Pollutants'], ascending=[1,0])
                                                                    
    
    logRows2DF.show(200,truncate=50)
    
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

In [10]:
!!time python ./P3/spark_df_p3.py > ./P3/p3.result

['21/12/23 23:11:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 'Setting default log level to "WARN".',
 'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 '',
 '[Stage 0:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 '',
 '[Stage 2:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 'real\t0m16.751s',
 'user\t0m1.281s',
 'sys\t0m0.809s']

In [11]:
!cat ./P3/p3.result

+----+--------------------+---------------------+
|Year|               State|       Avg Pollutants|
+----+--------------------+---------------------+
|1990|           Tennessee|   170.40093066666665|
|1990|             Indiana|    4.098978378378379|
|1990|       Massachusetts|   3.0246823529411766|
|1990|             Montana|   2.0686790073529413|
|1990|               Texas|   1.4824716546762589|
|1990|            New York|    1.362972027972028|
|1990|             Florida|   1.2765626315789476|
|1990|              Kansas|   1.1408154574132492|
|1990|           Louisiana|   0.9145945945945947|
|1990|            Virginia|   0.8451416666666666|
|1990|District Of Columbia|   0.8261508196721312|
|1990|      South Carolina|   0.5598140350877193|
|1990|          California|  0.41153099836333895|
|1990|           Minnesota|             0.306488|
|1990|          New Jersey|   0.2933352941176471|
|1990|            Illinois|  0.14575701219512197|
|1990|                Ohio|    

### P4

In [None]:
%load ./P4/spark_df_p4.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try:
    # files
    lines_states = sc.textFile('../data/usa_states.csv')
    lines = sc.textFile('../data/epa_hap_daily_summary-small.csv')

    # file mapping
    logRows_states = lines_states.filter( lambda line : len(line) > 0)    \
                    .zipWithIndex() \
                    .filter( lambda x: x[1] > 0) \
                    .map(lambda x: x[0]) \
                    .map( lambda line: line.split(',')) \
                    .map( lambda arr : Row( state = arr[0], name = arr[1], minLat = float(arr[2]), \
                                            maxLat = float(arr[3]), minLon = float(arr[4]), \
                                            maxLon = float(arr[5])))
    logRows = lines.filter( lambda line: len(line) > 0) \
                     .zipWithIndex() \
                     .filter( lambda x: x[1] > 0) \
                     .map(lambda x: x[0]) \
                     .map( lambda line: line.split(',')) \
                     .map( lambda arr: Row( name = arr[24], county = arr[1], siteNum = arr[2], Lat = float("{:.3f}".format(float(arr[5]))), Lon = float("{:.3f}".format(float(arr[6]))) ) )    
    
    # Dataframe creation
    logRowsStatesDF = spark.createDataFrame( logRows_states )
    logRowsDF = spark.createDataFrame( logRows )
    logRowsDF = logRowsDF.distinct() # Makes sure we are using different monitors

    # Necessary computations to solve the problem
    finalDF = logRowsStatesDF.withColumn('center_Lat', (col('minLat')+col('maxLat'))/2 ) \
                        .withColumn('center_Lon', (col('minLon')+col('maxLon'))/2 ) \
                        .drop('minLon') \
                        .drop('maxLon') \
                        .drop('minLat') \
                        .drop('maxLat') \
                        .drop('state') \
                        .drop('county') \
                        .drop('siteNum') \
                        .join(logRowsDF, 'name') \
                        .withColumn('distance', sqrt( pow((col('Lat')-col('center_Lat'))*111,2) + pow((col('Lon')-col('center_Lon'))*111,2) ) ) \
                        .groupBy('name').avg('distance')
    
    finalDF.show(54)

    sc.stop()
    
except Exception as err:
    print(err)
    sc.stop()

In [13]:
!!time python ./P4/spark_df_p4.py > ./P4/p4.result

['21/12/23 23:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 'Setting default log level to "WARN".',
 'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 '',
 '[Stage 0:>                                                          (0 + 2) / 2]',
 '                                                                                ',
 '',
 '[Stage 1:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 '',
 '[Stage 4:>                  (0 + 4) / 4][Stage 6:>                  (0 + 2) / 2]',
 '[Stage 4:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 'real\t0m21.783s',
 

In [14]:
!cat ./P4/p4.result

+--------------+------------------+
|          name|     avg(distance)|
+--------------+------------------+
|          Utah|184.91876919510068|
|        Hawaii|155.72848584906362|
|     Minnesota|195.06726533715846|
|          Ohio|175.74265339396337|
|      Arkansas|157.84229127028243|
|        Oregon| 268.8565041755878|
|         Texas| 512.0338063321818|
|  North Dakota| 248.4324915397246|
|  Pennsylvania|251.41090880145097|
|   Connecticut| 49.99224954412998|
|      Nebraska|307.13572198974265|
|       Vermont| 521.9872635614968|
|        Nevada| 326.2811669321348|
|   Puerto Rico| 32.73340559951166|
|    Washington| 219.9837579015187|
|      Illinois|435.24508277080713|
|      Oklahoma|236.88430984912995|
|Virgin Islands| 73.48448478701415|
|      Delaware|51.577754815182395|
|        Alaska| 603.7055510312542|
|    New Mexico|183.17300873742957|
| West Virginia| 144.4948207089262|
|      Missouri| 230.0545549271448|
|  Rhode Island|22.194351406681417|
|

### P5

In [None]:
%load ./P5/spark_df_p5.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try:
    # files
    lines_states = sc.textFile('../data/usa_states.csv')
    lines = sc.textFile('../data/epa_hap_daily_summary-small.csv') # Change the name of the file to what you have it named here
    
    # file mapping
    logRows_states = lines_states.filter( lambda line : len(line) > 0)    \
                    .zipWithIndex() \
                    .filter( lambda x: x[1] > 0) \
                    .map(lambda x: x[0]) \
                    .map( lambda line: line.split(',')) \
                    .map( lambda arr : Row( state = arr[0], name = arr[1], centerLat = (float(arr[2]) + float(arr[3]))/2, \
                                            centerLon = (float(arr[4]) + float(arr[5]))/2))
    logRows = lines.filter( lambda line: len(line) > 0) \
                     .zipWithIndex() \
                     .filter( lambda x: x[1] > 0) \
                     .map(lambda x: x[0]) \
                     .map( lambda line: line.split(',')) \
                     .map( lambda arr: Row( name = arr[24], countyCode = arr[1], siteNum = arr[2], lat = float("{:.3f}".format(float(arr[5]))), lon = float("{:.3f}".format(float(arr[6]))) ) )  
    
    # Creates the dataframes
    logRowsDF = spark.createDataFrame( logRows )
    
    logRows_statesDF = spark.createDataFrame( logRows_states )
    
    
    logRowsDF = logRowsDF.select('name','countyCode','siteNum','lat','lon').distinct()
    
    logRowsDF = logRowsDF.select('name','lat','lon')
    
    
    joinedDF = logRowsDF.join(logRows_statesDF,logRowsDF.name == logRows_statesDF.name,"inner" )
    
    MonitorDF = joinedDF.select((logRowsDF.name).alias('name'),\
                                (logRowsDF.lat).alias('lat'),\
                                (logRowsDF.lon).alias('lon'),\
                                (logRows_statesDF.centerLat).alias('centerLat'),\
                                (logRows_statesDF.centerLon).alias('centerLon')\
                               )
    
    
    MonitorDF = MonitorDF.withColumn("quadrant", when((col("lat") < col("centerLat")) & (col("lon") < col("centerLon")),'NW')\
                                                 .when((col("lat") < col("centerLat")) & (col("lon") > col("centerLon")),'NE')\
                                                 .when((col("lat") > col("centerLat")) & (col("lon") < col("centerLon")),'SW')\
                                                 .when((col("lat") > col("centerLat")) & (col("lon") > col("centerLon")),'SE')
                                                 .otherwise('Center or Borders')
                                    )                              
    
    FinalMonitorDF = MonitorDF.groupBy('name','quadrant').agg(count('quadrant').alias('Nr of Monitors')).sort('name', ascending = True)
    
    FinalMonitorDF.show(400)
    

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

In [16]:
!!time python ./P5/spark_df_p5.py > ./P5/p5.result

['21/12/23 23:12:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
 "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 'Setting default log level to "WARN".',
 'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 '',
 '[Stage 0:>                                                          (0 + 2) / 2]',
 '                                                                                ',
 '',
 '                                                                                ',
 '',
 '[Stage 4:>                  (0 + 2) / 2][Stage 5:>                  (0 + 4) / 4]',
 '[Stage 5:>                                                          (0 + 4) / 4]',
 '                                                                                ',
 'real\t0m19.339s',
 'user\t0m1.204s',
 'sys\t0m1.030s']

In [17]:
!cat ./P5/p5.result

+--------------+--------+--------------+
|          name|quadrant|Nr of Monitors|
+--------------+--------+--------------+
|       Alabama|      NE|             4|
|       Alabama|      SW|            14|
|       Alabama|      NW|             7|
|       Alabama|      SE|             4|
|        Alaska|      NE|             2|
|        Alaska|      NW|             3|
|        Alaska|      SW|             3|
|        Alaska|      SE|             4|
|       Arizona|      NE|            16|
|       Arizona|      SE|             2|
|       Arizona|      NW|            10|
|       Arizona|      SW|            10|
|      Arkansas|      SW|             3|
|      Arkansas|      SE|             2|
|      Arkansas|      NW|             5|
|      Arkansas|      NE|             1|
|    California|      NE|            68|
|    California|      NW|            15|
|    California|      SW|            84|
|    California|      SE|             2|
|      Colorado|      SE|         