# Spark DataFrame - Basics

Let's start off with the fundamentals of Spark DataFrame. 

Objective: In this exercise, you'll find out how to start a spark session, read in data, explore the data and manipuluate the data (using DataFrame syntax as well as SQL syntax). Let's get started! 

In [None]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import warnings
warnings.simplefilter(action='ignore')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('readin').getOrCreate()

In [None]:
# Let's read in the data. Note that it's in the csv

#City,Date,PM2.5,PM10,NO,NO2,NOx,NH3,CO,SO2,O3,Benzene,Toluene,Xylene,AQI,AQI_Bucket
#define the schema

# Let's import in the relevant types.
warnings.filterwarnings('ignore')
from pyspark.sql.types import *
Schema=StructType([
  StructField("City",StringType(),nullable=True),
  StructField("Date",StringType(),nullable=True),
  StructField("PM25",FloatType(),nullable=True),
  StructField("PM10",FloatType(),nullable=True),
  StructField("NO",FloatType(),nullable=True),
  StructField("NO2",FloatType(),nullable=True),
  StructField("NOX",FloatType(),nullable=True),
  StructField("NH3",FloatType(),nullable=True),
  StructField("CO",FloatType(),nullable=True),
  StructField("SO2",FloatType(),nullable=True),
  StructField("O3",FloatType(),nullable=True),
  StructField("benzene",FloatType(),nullable=True),
  StructField("toluene",FloatType(),nullable=True),
  StructField("Xylene",FloatType(),nullable=True),
  StructField("AQI",FloatType(),nullable=True),
  StructField("AQIBucket",StringType(),nullable=True)
])
df = spark.read.option("header",True).schema(Schema).csv("Datasets/city_day.csv")

df.show()


## Data Exploration

In [None]:
# The show method allows you visualise DataFrames. We can see that there are two columns. 
df.show()

# You could also try this. 
df.columns

df.dtypes

df.describe().toPandas()


In [None]:
# We can use the describe method get some general statistics on our data too. Remember to show the DataFrame!
# But what about data type?
# Then create a variable with the correct structure.
df.describe().show()

In [None]:
# For type, we can use print schema. 
# But wait! What if you want to change the format of the data? Maybe change age to an integer instead of long?
# And now we can read in the data using that schema. If we print the schema, we can see that age is now an integer.
df.printSchema()

## Data Manipulation

In [None]:
df.describe().toPandas()

df.groupby('AQIBucket').count().show()


In [None]:

df.groupby('City').count().show()

In [None]:
df.count()
# Let's see the data. You'll notice nulls.
df.show()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
df.createOrReplaceTempView('pollution')

# After that, we can use the SQL programming language for queries. 
results = spark.sql("SELECT * FROM pollution")
results.show()

In [None]:
# After that, we can use the SQL programming language for queries. 
results1 = spark.sql("SELECT city, count(City) FROM pollution where AQI is null group by City")
results1.show()

In [None]:
# Find count for empty, None, Null, Nan with string literals.
from pyspark.sql.functions import col,isnan,when,count
df2 = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])
df2.show()

In [None]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

# Exploring data with SQL

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.simplefilter(action='ignore')

fig = plt.figure(figsize=(25,13))
st = fig.suptitle("Distribution of features",fontsize = 50, verticalalignment="center")
for col,num in zip(df.toPandas().describe().columns, range(1,11)):
    ax = fig.add_subplot(3,4, num)
    ax.hist(df.toPandas()[col])
    plt.grid(False)
    plt.xticks(rotation=45, fontsize=20)
    plt.yticks(fontsize=15)
    plt.title(col.upper(), fontsize=20)

plt.tight_layout()
st.set_y(0.95)
fig.subplots_adjust(top=0.85, hspace=0.4)
plt.show()

In [None]:
#Import all the required functions
from pyspark.sql.functions import year 
df.createOrReplaceTempView('pollution')

results1 = spark.sql("SELECT city, Count(AQIBucket), count(City) FROM pollution where AQIBucket is not null group by City, AQIBucket")
results1.show()


In [None]:
# Requires a certain amount of non-null values. Row two was dropped, as there's only one non-null value.
df.na.drop(thresh=8).show()

In [None]:
df.count()

In [None]:
import sys
###backward fill
from pyspark.sql import Window
from pyspark.sql.functions import first,last

# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(0, sys.maxsize)

# define the forward-filled column
filled_column = first(df['PM25'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = df.withColumn('PM25', filled_column)

# show off our glorious achievements
spark_df_filled.orderBy('City', 'Date').show(10) 
spark_df_filled.show()

# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(-sys.maxsize,0)

# define the forward-filled column
filled_column = last(spark_df_filled['PM25'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df_filled.withColumn('PM25', filled_column)
# show off our glorious achievements

spark_df_filled.orderBy('City', 'Date').show(10) 
spark_df_filled.show()


In [None]:
spark_df_filled.filter(spark_df_filled.PM25.isNull()).show()

In [None]:
# Find count for empty, None, Null, Nan with string literals.
from pyspark.sql.functions import col,isnan,when,count
df2 = spark_df_filled
df2 = df2.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df2.columns])
df2.show()

In [None]:
import sys
###backward fill
from pyspark.sql import Window
from pyspark.sql.functions import last, first

# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(-sys.maxsize , 0)

# define the forward-filled column
filled_column = last(spark_df_filled['PM10'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df_filled.withColumn('PM10', filled_column)

# show off our glorious achievements
spark_df_filled.orderBy('City', 'Date').show(10) 
spark_df_filled.show()



# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(0, sys.maxsize)

# define the forward-filled column
filled_column= first(spark_df_filled['PM10'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df_filled.withColumn('PM10', filled_column)

# show off our glorious achievements
spark_df_filled.orderBy('City', 'Date').show(10) 
spark_df_filled.show()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.

dff= spark_df_filled
dff.createOrReplaceTempView('pollution')

results = spark.sql("SELECT * FROM pollution where pm10 is null")
results.show()

In [None]:
dff.filter(dff.PM10.isNull()).show()

In [None]:
from pyspark.sql.functions import mean
mean_pm10 = dff.select(mean(dff['PM10'])).collect()
mean_pm10[0][0]


In [None]:


dff1 = dff.na.fill(mean_pm10[0][0], subset=['PM10'])
dff1.filter(dff1.PM10.isNull()).show()

In [None]:
dff1.show()

In [None]:

#dff.filter(dff.PM25.isNull()).show()
#from pyspark.sql.functions import isnan, when, count, col

#dff.select([count(when(isNull(c), c)).alias(c) for c in dff.columns]).show()

# Find count for empty, None, Null, Nan with string literals.
from pyspark.sql.functions import col,isnan,when,count

dffs = dff1.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in dff.columns])
dffs.show()

In [None]:
# To Get Year from date or Time column
spark_df_filled = dff1
spark_df_filled1 = spark_df_filled.withColumn("year",year("Date"))


In [None]:
dff2= spark_df_filled1
dff2.show()



In [None]:
dff2.createOrReplaceTempView('pollution')

results = spark.sql("SELECT * FROM pollution where PM10 is null")
results.show()

In [None]:
dff2.count()
dff2.show()

In [None]:
#df1.show() dff5.show()

In [None]:
# Let's collect the average. You'll notice that the collection returns the average in an interesting format.
mean_no2 = dff2.select(mean(dff2['NO2'])).collect()
mean_so2 = dff2.select(mean(dff2['SO2'])).collect()
mean_o3 = dff2.select(mean(dff2['O3'])).collect()
#mean_pm10
mean_no2[0][0]
dff3 = dff2.na.fill(mean_no2[0][0], subset=['NO2'])
mean_so2[0][0]
dff4 = dff3.na.fill(mean_so2[0][0], subset=['SO2'])
mean_o3[0][0]
dff5 = dff4.na.fill(mean_o3[0][0], subset=['O3'])



In [None]:

#df1=df1.drop('date')
from pyspark.sql.functions import col,isnan,when,count

dff6 = dff5.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in dff5.columns])
dff6.show()


In [None]:

dff5.show()

In [None]:
# Let's collect the average. You'll notice that the collection returns the average in an interesting format.
#mean_pm10 = df.select(mean(df['PM10'])).collect()
#mean_pm10
#mean_pm10[0][0]
#df.filter(df.PM10.isNull()).show()
#df1 = df1.na.fill(mean_pm10[0][0], subset=['PM10'])
#df1.filter(df1.PM10.isNull()).show()

In [None]:

dftr=dff5.drop('NO')
dftr.show()

In [None]:


dftr=dftr.drop('NOX')
dftr=dftr.drop('NH3')
dftr=dftr.drop('CO')
dftr=dftr.drop('benzene')
dftr=dftr.drop('toluene')
dftr=dftr.drop('Xylene')

In [None]:
dftr.show()
#df1=df1.drop('date')
from pyspark.sql.functions import col,isnan,when,count

dftr1 = dftr.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in dftr.columns])
dftr1.show()

In [None]:
#df1 = df1.na.fill(mean_pm10[0][0], subset=['PM10'])
#dftr2.filter(dftr.AQI.isNull()).show()

In [None]:
#Replace 0 for null on only population column 
#df2 = df2.na.fill(value=0,subset=["AQI"]).show()


In [None]:
dftr.show()

In [None]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
dftr.withColumn("AQIBucket1", func.last('AQIBucket', True).over(Window.partitionBy('City').orderBy('year').rowsBetween(-sys.maxsize, 0))).show()


In [None]:

###backward fill
from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(0, sys.maxsize)

# define the forward-filled column
filled_column = first(dftr['AQIBucket'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = dftr.withColumn('AQIBucket', filled_column)

# show off our glorious achievements
spark_df_filled.orderBy('City', 'Date').show(10) 

In [None]:
spark_df_filled.show()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spark_df_filled.createOrReplaceTempView('pollution')

results2 = spark.sql("SELECT * FROM pollution where AQIBucket is null")
results2.show()

In [None]:
###backward fill
from pyspark.sql import Window
from pyspark.sql.functions import last

# define the window
window = Window.partitionBy('City')\
               .orderBy('Date')\
               .rowsBetween(-sys.maxsize ,0 )

# define the forward-filled column
filled_column = last(spark_df_filled['AQIBucket'], ignorenulls=True).over(window)

# do the fill
spark_df_filled2 = spark_df_filled.withColumn('AQIBucket', filled_column)

# show off our glorious achievements
spark_df_filled2.orderBy('City', 'Date').show(10)

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spark_df_filled2.createOrReplaceTempView('pollution')

results3 = spark.sql("SELECT * FROM pollution where AQIBucket is null")
results3.show()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spark_df_filled2.createOrReplaceTempView('pollution')

results4 = spark.sql("SELECT AQIBucket, count(AQIBucket),city,year FROM pollution group by AQIbucket,city,year")
results4.show()

In [None]:
###check the AQI for the rows, and update the nulls

spark_df_filled2.filter(spark_df_filled2.AQI.isNull()).show()

In [None]:
###backward fill
from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('City')\
               .orderBy('AQIBucket')\
               .rowsBetween(0, sys.maxsize)

# define the forward-filled column
filled_column = first(spark_df_filled2['AQI'], ignorenulls=True).over(window)

# do the fill
spark_df_filled3 = spark_df_filled2.withColumn('AQII', filled_column)

# show off our glorious achievements
spark_df_filled3.orderBy('City', 'AQII').show(10) 

In [None]:

spark_df_filled3.filter(spark_df_filled3.AQII.isNull()).show()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spark_df_filled3.createOrReplaceTempView('pollution')

results4 = spark.sql("SELECT count(AQIBucket),AQIBucket,city,year FROM pollution group by AQIbucket,city,year")
results4.show()

In [None]:
spark_df_filled3.show()

In [None]:
##transform with String Indexer and OneHotEncoder

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder


indexer = StringIndexer(inputCol="AQIBucket", outputCol="AQIB_index").fit(spark_df_filled3)
spark_df_ind = indexer.transform(spark_df_filled3)
spark_df_ind.show()


#df3 = encoded


In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spark_df_ind.createOrReplaceTempView('pollution')

results4 = spark.sql("SELECT AQIBucket ,AQIB_index, count(AQIB_index), city,year FROM pollution group by AQIBucket, AQIB_index, city,year")
results4.show()
result = spark.sql("Select * from pollution")
result.show()

In [None]:
###one hot encoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

encoder = OneHotEncoder(inputCol="AQIB_index", outputCol="AQIB_vec")
ohe = encoder.fit(spark_df_ind) # indexer is the existing dataframe, see the question
encoded = ohe.transform(spark_df_ind)
encoded.show()

In [None]:

spark_df_filled3.dtypes


In [None]:
catCols = ['AQIBucket']
numCols = ['PM25','PM10','NO2','SO2','O3','AQII']

In [None]:
print(numCols)
print(catCols)


In [None]:
spark_df_filled3.show()

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

va = VectorAssembler(inputCols=numCols, outputCol="SS_features")


In [None]:
temp_train = va.transform(spark_df_filled3)

temp_train.show(2, truncate=False)



In [None]:
temp_train.select("SS_features").show(5,truncate=False)

In [None]:
ss=StandardScaler(inputCol="SS_features", outputCol="Scaled")
train1 = ss.fit(temp_train).transform(train)
train1.select("Scaled").show(5,truncate=False)

In [None]:
from pyspark.ml.feature import MinMaxScaler
mms = MinMaxScaler(inputCol="SS_features", outputCol="MMScaled")
train = mms.fit(temp_train).transform(temp_train)
train.select("MMScaled").show(5)

In [None]:
train.show(2, truncate=False)

In [None]:

#dataframe columns 
encoded.columns

encoded.show()

In [None]:
from pyspark.ml.feature import VectorAssembler

inputCols = [
 'PM25',
 'year',
 'AQIB_index',
 'AQIB_vec']

outputCol = "features"
df_va = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
encoded = df_va.transform(encoded)
encoded.select(['features']).toPandas().head(5)

