In [1]:
import pandas as pd
import numpy as np
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import min,max
from pyspark.sql.functions import weekofyear, sum
import pyspark.sql.types as types
import math
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [2]:
sc = SparkContext() 
sqlContext = pyspark.SQLContext(sc)
spark = pyspark.sql.SparkSession(sc) 

In [3]:
data = spark.read.csv("clean_data.csv",header='true',inferSchema='true')

In [4]:
data.count()

435735

In [6]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- no2: double (nullable = true)
 |-- spm: double (nullable = true)
 |-- location: string (nullable = true)
 |-- rspm: double (nullable = true)
 |-- sampling_date: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- so2: double (nullable = true)
 |-- type: string (nullable = true)



In [12]:
data.createOrReplaceTempView("env")

In [13]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |      env|       true|
+--------+---------+-----------+



In [15]:
spark.sql("select * from env").show()

+---+----+------------------+---------+------------------+------------------+-------------------+--------------+----+--------------------+
|_c0| no2|               spm| location|              rspm|     sampling_date|               date|         state| so2|                type|
+---+----+------------------+---------+------------------+------------------+-------------------+--------------+----+--------------------+
|  0|17.4|220.78347959832885|Hyderabad|108.83278418538401|February - M021990|1990-02-01 00:00:00|Andhra Pradesh| 4.8|Residential, Rura...|
|  1| 7.0|220.78347959832885|Hyderabad|108.83278418538401|February - M021990|1990-02-01 00:00:00|Andhra Pradesh| 3.1|     Industrial Area|
|  2|28.5|220.78347959832885|Hyderabad|108.83278418538401|February - M021990|1990-02-01 00:00:00|Andhra Pradesh| 6.2|Residential, Rura...|
|  3|14.7|220.78347959832885|Hyderabad|108.83278418538401|   March - M031990|1990-03-01 00:00:00|Andhra Pradesh| 6.3|Residential, Rura...|
|  4| 7.5|220.7834795983288

In [18]:
spark.sql("select min(date) as daterange from env union select max(date) from env").show()

+-------------------+
|          daterange|
+-------------------+
|1987-01-01 00:00:00|
|2015-12-31 00:00:00|
+-------------------+



In [29]:
spark.sql("select location,date from env order by so2 DESC limit 5").show()

+--------+-------------------+
|location|               date|
+--------+-------------------+
| Chennai|2011-02-05 00:00:00|
|  Nagpur|2012-06-25 00:00:00|
|Raniganj|2011-04-02 00:00:00|
|Raniganj|2011-11-01 00:00:00|
|Raniganj|2011-12-27 00:00:00|
+--------+-------------------+



In [37]:
spark.sql("select location,state,t1.year from (select no2,location,state,year(date) as year from env) t1,(select min(no2) as min_no2, year(date) as year from env group by year) t2 where t1.year=t2.year and min_no2=no2 group by location,state,t1.year order by t1.year").show()

+-----------------+-----------+----+
|         location|      state|year|
+-----------------+-----------+----+
|           Madras| Tamil Nadu|1987|
|           Madras| Tamil Nadu|1988|
|           Haldia|West Bengal|1989|
|Daman Diu & Nagar|Daman & Diu|1990|
|         Guwahati|      Assam|1991|
|     Yamuna Nagar|    Haryana|1992|
|        Faridabad|    Haryana|1992|
|            Nalco|     Odisha|1992|
|         Rourkela|     Odisha|1992|
|           Rajkot|    Gujarat|1993|
|        Kozhikode|     Kerala|1994|
|       Coimbatore| Tamil Nadu|1995|
|        Jalandhar|     Punjab|1996|
|           Cochin|     Kerala|1997|
|           Madras| Tamil Nadu|1997|
|       Coimbatore| Tamil Nadu|1997|
|         Shillong|  Meghalaya|1997|
|         Shillong|  Meghalaya|1998|
|           Cochin|     Kerala|1998|
|         Shillong|  Meghalaya|1999|
+-----------------+-----------+----+
only showing top 20 rows



In [41]:
spark.sql("(select 'no2' as gas,year(date) as year from env group by year order by max(no2) limit 1) union (select 'so2' as gas,year(date) as year from env group by year order by max(so2) limit 1) union (select 'spm' as gas,year(date) as year from env group by year order by max(spm) limit 1) union (select 'rspm' as gas,year(date) as year from env group by year order by max(rspm) limit 1)").show()

+----+----+
| gas|year|
+----+----+
|rspm|1990|
| no2|1998|
| spm|2015|
| so2|2003|
+----+----+



In [53]:
spark.sql("select month(date) as month,sum(so2) as so2 from env where state like 'Tamil Nadu' and year(date)=2001 group by month order by so2 DESC limit 1").show()

+-----+------------------+
|month|               so2|
+-----+------------------+
|   12|176.80000000000004|
+-----+------------------+



In [68]:
spark.sql("select location,env.spm from env,(select max(spm) as spm from env where year(date)=2001 and month(date)=11 and dayofmonth(date)=1) t1 where year(date)=2001 and month(date)=11 and dayofmonth(date)=1 and env.spm=t1.spm").show()

+--------+-----+
|location|  spm|
+--------+-----+
|  Kanpur|811.0|
+--------+-----+



In [71]:
range_slider = widgets.IntRangeSlider(
    value=[1987, 2015],
    min=1987, max=2015, step=1,
    description='day range:',
    
)

In [73]:
range_slider

IntRangeSlider(value=(1994, 2009), description='day range:', max=2015, min=1987)

In [80]:
query = ("select sum(so2) as total_so2, sum(no2) as total_no2, sum(spm) as total_spm, sum(rspm) as total_rspm from env where year(date) between {} and {}"
         .format(range_slider.value[0],range_slider.value[1]))
spark.sql(query).show()

+-----------------+-----------------+--------------------+-------------------+
|        total_so2|        total_no2|           total_spm|         total_rspm|
+-----------------+-----------------+--------------------+-------------------+
|2161771.501226218|4872989.589518057|4.0145263220047936E7|1.993355516735378E7|
+-----------------+-----------------+--------------------+-------------------+



In [88]:
spark.sql("select date,(so2+no2) as gas from env,(select max(so2+no2) as maxgas from env where location='Faridabad') t1 where maxgas=(so2+no2)").show()



+-------------------+-----+
|               date|  gas|
+-------------------+-----+
|2015-03-26 00:00:00|218.0|
|1999-02-01 00:00:00|218.0|
|2011-10-27 00:00:00|218.0|
|2012-02-15 00:00:00|218.0|
+-------------------+-----+



In [90]:
spark.sql("select year(date) as year,avg(so2) as avg_so2, avg(no2) as avg_no2, avg(spm) as avg_spm, avg(rspm) as avg_rspm from env group by year(date) order by year(date)").show()

+----+------------------+------------------+------------------+------------------+
|year|           avg_so2|           avg_no2|           avg_spm|          avg_rspm|
+----+------------------+------------------+------------------+------------------+
|1987|18.897787927066712|29.491222065387078| 278.4012901728386|108.83278418538418|
|1988|20.093998803096028|29.760822757336758|247.35356990069138|108.83278418538391|
|1989| 18.31566018218572|29.133040653652802|237.62027260897725|108.83278418538373|
|1990| 17.46782342609222| 25.71627571793857|242.30536151344407|108.83278418538318|
|1991| 17.17563717526931|25.979129704731992|241.39668863701894|108.83278418538313|
|1992| 17.05150799054437|30.458832108204238|199.39217115784672|108.83278418538326|
|1993|21.582684427808616|30.100066576626993| 226.2074549641353|108.83278418538322|
|1994|21.902739179733913| 31.49709287273406|243.56870867004744|108.83278418538316|
|1995| 23.78482081866733|32.245831621258404|242.81573746263095|108.83278418538315|
|199

In [91]:
spark.sql("select state,count(*) as num_of_records from env group by state").show()

+-----------------+--------------+
|            state|num_of_records|
+-----------------+--------------+
|         Nagaland|          2463|
|        Karnataka|         17118|
|           Odisha|         19278|
|           Kerala|         24728|
|       Tamil Nadu|         20597|
|     Chhattisgarh|          7831|
|   Andhra Pradesh|         26368|
|   Madhya Pradesh|         19920|
|           Punjab|         25634|
|          Manipur|            76|
|      Daman & Diu|           782|
|  Jammu & Kashmir|          1289|
|              Goa|          6206|
|          Mizoram|          5338|
| Himachal Pradesh|         22896|
|       Puducherry|          3785|
|          Haryana|          3420|
|        Jharkhand|          5968|
|Arunachal Pradesh|            90|
|          Gujarat|         21279|
+-----------------+--------------+
only showing top 20 rows

