# CalCOFI
### Over 60 years of oceanographic data
#### Dataset: https://www.kaggle.com/datasets/sohier/calcofi
#### Table Info: https://calcofi.org/data/oceanographic-data/bottle-database/

## EDA and Data Preparation

In [1]:
from pyspark.sql.functions import *

#### Load Data

In [2]:
''' Loading in spark '''
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAll([
    ('spark.master', 'local[1]'), 
    ('spark.app.name', 'App Name')])
    
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.version

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-22 00:53:27,037 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.2.1'

In [3]:
''' Read the data '''
# bottle = spark.read.csv("hdfs:///bottle.csv", header=True, inferSchema=True).cache() ##leslie and katie's path
bottle = spark.read.csv("file:///home/work/Final/bottle.csv", header=True, inferSchema=True).cache() ##karina's path
maxRows = bottle.count()
print("There are", maxRows, "rows in the initial dataframe")

2022-05-22 00:53:39,297 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

There are 864863 rows in the initial dataframe


                                                                                

In [4]:
''' View the inferred schema '''
print("There are", len(bottle.columns), "columns in the initial dataframe")
bottle.printSchema()

There are 74 columns in the initial dataframe
root
 |-- Cst_Cnt: integer (nullable = true)
 |-- Btl_Cnt: integer (nullable = true)
 |-- Sta_ID: string (nullable = true)
 |-- Depth_ID: string (nullable = true)
 |-- Depthm: integer (nullable = true)
 |-- T_degC: double (nullable = true)
 |-- Salnty: double (nullable = true)
 |-- O2ml_L: double (nullable = true)
 |-- STheta: double (nullable = true)
 |-- O2Sat: double (nullable = true)
 |-- Oxy_µmol/Kg: double (nullable = true)
 |-- BtlNum: integer (nullable = true)
 |-- RecInd: integer (nullable = true)
 |-- T_prec: integer (nullable = true)
 |-- T_qual: integer (nullable = true)
 |-- S_prec: integer (nullable = true)
 |-- S_qual: integer (nullable = true)
 |-- P_qual: integer (nullable = true)
 |-- O_qual: integer (nullable = true)
 |-- SThtaq: integer (nullable = true)
 |-- O2Satq: integer (nullable = true)
 |-- ChlorA: double (nullable = true)
 |-- Chlqua: integer (nullable = true)
 |-- Phaeop: double (nullable = true)
 |-- Phaqua: in

In [5]:
''' See a snippet of what this dataframe looks like '''
bottle.show(2)

+-------+-------+-----------+--------------------+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+----+------+------+-----+----+-----+----+-----+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+----+----+----+----+-------------------+
|Cst_Cnt|Btl_Cnt|     Sta_ID|            Depth_ID|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|BtlNum|RecInd|T_prec|T_qual|S_prec|S_qual|P_qual|O_qual|SThtaq|O2Satq|ChlorA|Chlqua|Phaeop|Phaqua|PO4uM|PO4q|SiO3uM|SiO3qu|NO2uM|NO2q|NO3uM|NO3q|NH3uM|NH3q|C14As1|C14A1p|C14A1q|C14As2|C14A2p|C14A2q|DarkAs|DarkAp|DarkAq|MeanAs|MeanAp|MeanAq|IncTim|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CH

#### Remove columns that are not needed

Removing the four string columns because they aren't useful for our purposes
* Sta_ID: Line and Station
* Depth_ID: Uses the Cast_ID prefix ([Century]-[Year][Month][ShipCode]-[CastType][Julian Day]-[CastTime]-[Line][Sta]) but adds three additional variables: [Depth][Bottle]-[Rec_Ind]
* IncTim: Elapsed incubation time of the primary productivity experiment
* DIC Quality Comment: Quality Comment

Also removing the Cast and Bottle counts, which are essentially indexes (identifiers)
* 'Cst_Cnt': Auto-numbered Cast Count - all casts consecutively numbered. 1 is first station done
* 'Btl_Cnt': Auto-numbered Bottle count- all bottles ever sampled, consecutively numbered
* 'BtlNum': Bottle Number

In [6]:
''' Dropping unneeded columns and viewing two rows of the resulting dataframe '''
deleteList1 = ["Sta_ID","Depth_ID","IncTim","DIC Quality Comment","Cst_Cnt","Btl_Cnt","BtlNum"]
bottle = bottle.drop(*deleteList1)

print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

There are now 67 columns and 864863 rows
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+----+------+------+-----+----+-----+----+-----+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+----+----+----+----+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|T_qual|S_prec|S_qual|P_qual|O_qual|SThtaq|O2Satq|ChlorA|Chlqua|Phaeop|Phaqua|PO4uM|PO4q|SiO3uM|SiO3qu|NO2uM|NO2q|NO3uM|NO3q|NH3uM|NH3q|C14As1|C14A1p|C14A1q|C14As2|C14A2p|C14A2q|DarkAs|DarkAp|DarkAq|MeanAs|MeanAp|MeanAq|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1 |TA2 |pH2 |pH1 |
+------+------+------+------+------+-----+-------

#### Handle quality values

deleteList2 = ['T_qual','S_qual','P_qual','O_qual','O2Satq','Chlqua','Phaeop','Phaqua','PO4uM','PO4q','SiO3qu','NO2q','NO3q','NH3q','C14A1q','C14A2q','DarkAq','MeanAq']
for i in deleteList2:
    numBadData = bottle.filter(bottle[i] == 8).count()
    print(f"Number of bad quality data for {i}: {numBadData}")

''' Dropping bad quality rows of quality columns'''
for i in deleteList2:
    bottle = bottle.filter(bottle[i] != 8)Removing the four columns indicating quality codes because we're using the quantity measurements instead
* T_qual: Temperature Quality Code
* S_qual: Salinity Quality Code
* P_qual: Pressure Quality Code
* O_qual: Oxygen Quality Code
* 'O2Satq': Oxygen Saturation Quality Code
* 'Chlqua': Chlorophyll-a Quality Code
* 'Phaeop': Phaeophytin Quality Code
* 'Phaqua': Phosphate Quality Code
* 'PO4uM': Salinity Quality Code
* 'PO4q': Phosphate Quality Code
* 'SiO3qu': Quality Code
* 'NO2q': Quality Code
* 'NO3q': Nitrate Quality Code
* 'NH3q': Ammonium Quality Code
* 'C14A1q': 14C As1 Quality Code
* 'C14A2q': 14C As2 Quality Code
* 'DarkAq': 14C Assimilation Dark Bottle Quality Code
* 'MeanAq': Mean 14C Assimilation Quality Code

In [7]:
deleteList2 = ['T_qual','S_qual','P_qual','O_qual','O2Satq','Chlqua','Phaeop','Phaqua','PO4uM','PO4q','SiO3qu','NO2q','NO3q','NH3q','C14A1q','C14A2q','DarkAq','MeanAq']
for i in deleteList2:
    numBadData = bottle.filter(bottle[i] == 8).count()
    print(f"Number of bad quality data for {i}: {numBadData}")

''' Dropping bad quality rows of quality columns'''
for i in deleteList2:
    bottle = bottle.filter(bottle[i] != 8)
    print(bottle.count())

Number of bad quality data for T_qual: 575
Number of bad quality data for S_qual: 1825
Number of bad quality data for P_qual: 0
Number of bad quality data for O_qual: 1455
Number of bad quality data for O2Satq: 2657
Number of bad quality data for Chlqua: 97
Number of bad quality data for Phaeop: 0
Number of bad quality data for Phaqua: 100
Number of bad quality data for PO4uM: 0
Number of bad quality data for PO4q: 302
Number of bad quality data for SiO3qu: 155
Number of bad quality data for NO2q: 2210
Number of bad quality data for NO3q: 2257
Number of bad quality data for NH3q: 0
Number of bad quality data for C14A1q: 7
Number of bad quality data for C14A2q: 7
Number of bad quality data for DarkAq: 7
Number of bad quality data for MeanAq: 7
22552
19286
19046
7692
5577
2919
0
0
0
0
0
0
0
0
0
0
0
0


In [8]:
''' Dropping quality/irrelevant columns and viewing two rows of the resulting dataframe '''
bottle = bottle.drop(*deleteList2)

print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

There are now 49 columns and 0 rows
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|S_prec|SThtaq|ChlorA|SiO3uM|NO2uM|NO3uM|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+---

#### Remove columns with low data count

We start by counting the number of NaNs and nulls in each column and reporting them in a new dataframe. Then the columns with less than 200,000 non-nulls are deleted.

In [9]:
''' Counting the number of null/NaN rows per column and outputting that in a new dataframe '''

def getNullCounts(df):
    return df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in bottle.columns])

nullCounter = getNullCounts(bottle)
nullCounter.show()

+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|S_prec|SThtaq|ChlorA|SiO3uM|NO2uM|NO3uM|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---

In [10]:
''' Deleting columns with less than 200000 non-nulls '''
thresh = maxRows - 200000
deleteList3 = []
for value in nullCounter.columns:
    if nullCounter.filter(nullCounter[value] > thresh).select(nullCounter[value]).collect():
        deleteList3.append(value)
bottle = bottle.drop(*deleteList3)

print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

There are now 49 columns and 0 rows
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|S_prec|SThtaq|ChlorA|SiO3uM|NO2uM|NO3uM|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+------+-----+-----------+------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+---

#### Remove null values from chlorophyll column

Because this is the target column, we can only use the non-null rows. We also get rid of the duplicate chlorophyll column `ChlorA`, keeping `R_CHLA` because it has less non-null rows (though only by a few).

In [11]:
''' Illustrating that we have two target columns that are essentially duplicates of each other '''
bottle.corr("ChlorA","R_CHLA")

nan

Because we only need one of these columns, we can delete the other. Then we drop all null rows of the R_CHLA column since it's our target.

In [12]:
''' Looking at which of the two target columns have more NaNs in order to select which to delete '''
nullCounter.select(*["ChlorA","R_CHLA"]).show()

+------+------+
|ChlorA|R_CHLA|
+------+------+
|     0|     0|
+------+------+



In [13]:
''' Delete the ChlorA column and drop all NaN rows in the R_CHLA column '''
bottle = bottle.drop("ChlorA")
bottle = bottle.dropna(subset="R_CHLA")

print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

There are now 48 columns and 0 rows
+------+------+------+------+------+-----+-----------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|S_prec|SThtaq|SiO3uM|NO2uM|NO3uM|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+------+-----+-----------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+

#### Remove "duplicate" columns

As we can see from the column above, there are some columns that are essentially duplicates of each other but in different units of measurement. Of the remaining columns, here are their descriptions that will help us determine which features to compare for potential deletion:

In [14]:
''' See what columns we still have '''
print(bottle.columns)

['Depthm', 'T_degC', 'Salnty', 'O2ml_L', 'STheta', 'O2Sat', 'Oxy_µmol/Kg', 'RecInd', 'T_prec', 'S_prec', 'SThtaq', 'SiO3uM', 'NO2uM', 'NO3uM', 'NH3uM', 'C14As1', 'C14A1p', 'C14As2', 'C14A2p', 'DarkAs', 'DarkAp', 'MeanAs', 'MeanAp', 'LightP', 'R_Depth', 'R_TEMP', 'R_POTEMP', 'R_SALINITY', 'R_SIGMA', 'R_SVA', 'R_DYNHT', 'R_O2', 'R_O2Sat', 'R_SIO3', 'R_PO4', 'R_NO3', 'R_NO2', 'R_NH4', 'R_CHLA', 'R_PHAEO', 'R_PRES', 'R_SAMP', 'DIC1', 'DIC2', 'TA1', 'TA2', 'pH2', 'pH1']


Chlorophyll
* 'ChlorA': Acetone extracted chlorophyll-a measured fluorometrically
* 'R_CHLA': Reported Chlorophyll-a (micrograms per liter)

Depth
* 'Depthm': Depth in meters
* 'R_Depth': Reported Depth (from pressure) in meters

Water density
* 'STheta': Potential Density of Water
* 'R_SIGMA': Reported Potential Density of water

Silicate
* 'SiO3uM': Micromoles Silicate per liter of seawater
* 'R_SIO3': Reported Silicate Concentration

Nitrite
* 'NO2uM': Micromoles Nitrite per liter of seawater
* 'R_NO2': Reported Nitrite Concentration

Nitrate
* 'NO3uM': Micromoles Nitrate per liter of seawater
* 'R_NO3': Reported Nitrate Concentration

Salinity
* 'Salnty': Practical Salinity Scale, 1978 (UNESCO, 1981a); Salinity of water
* 'R_SALINITY': Reported Salinity (from Specific Volume Anomoly, M³/Kg)

O2 saturation
* 'O2Sat': Percent Saturation; Oxygen Saturation
* 'R_O2Sat': Percent	Reported Oxygen Saturation

Oxygen
* 'O2ml_L': Oxygen in mL/L; Milliliters of dissolved oxygen per Liter seawater
* 'Oxy_µmol/Kg': Oxygen in micro moles per kilogram of seawater
* 'R_O2': Reported milliliters of oxygen per liter of seawater

Temperature
* 'T_degC': Temperature of Water
* 'R_TEMP': Reported Temperature (Celsius)
* 'R_POTEMP': Reported Potential Temperature (Celsius)

Other
* 'S_prec': Salinity Units of Precision
* 'T_prec': Temperature Units of Precision
* 'RecInd': Record Indicator
* 'R_SVA': Reported Specific Volume Anomaly
* 'R_DYNHT': Reported Dynamic Height
* 'R_PO4': Reported Phosphate Concentration
* 'R_PHAEO': Reported Phaeophytin
* 'R_PRES': Pressure in decibars

In [15]:
''' Setting up a function that will streamline the comparing process '''
nullCounter = getNullCounts(bottle)

def psudoDuplicateCheck(features):
    nullCounter.select(*features).show()

    if len(features)==2:
        print("correlation:", bottle.corr(features[0],features[1]))
    elif len(features)==3:
        print("1&2 correlation:", bottle.corr(features[0],features[1]))
        print("2&3 correlation:", bottle.corr(features[1],features[2]))
        print("1&3 correlation:", bottle.corr(features[0],features[2]))

In [16]:
nullCounter.show()

+------+------+------+------+------+-----+-----------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|Depthm|T_degC|Salnty|O2ml_L|STheta|O2Sat|Oxy_µmol/Kg|RecInd|T_prec|S_prec|SThtaq|SiO3uM|NO2uM|NO3uM|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_POTEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+------+-----+-----------+------+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------+------+--------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|     0|     0|    

In [17]:
Depths = ["Depthm","R_Depth"]
print("Depths NaN count:")
psudoDuplicateCheck(Depths)

WD = ["STheta","R_SIGMA"]
print("\nWater density NaN count:")
psudoDuplicateCheck(WD)

Silicate = ["SiO3uM","R_SIO3"]
print("\nSilicate NaN count:")
psudoDuplicateCheck(Silicate)

Nitrite = ["NO2uM","R_NO2"]
print("\nNitrite NaN count:")
psudoDuplicateCheck(Nitrite)

Nitrate = ["NO3uM","R_NO3"]
print("\nNitrate NaN count:")
psudoDuplicateCheck(Nitrate)

Salinity = ["Salnty","R_SALINITY"]
print("\nSalinity NaN count:")
psudoDuplicateCheck(Salinity)

Saturation = ["O2Sat","R_O2Sat"]
print("\nO2 Saturation NaN count:")
psudoDuplicateCheck(Saturation)

Oxygen = ["O2ml_L","Oxy_µmol/Kg","R_O2"]
print("\nOxygen NaN count:")
psudoDuplicateCheck(Oxygen)

Temperature = ["T_degC","R_TEMP","R_POTEMP"]
print("\nTemperature NaN count:")
psudoDuplicateCheck(Temperature)

Depths NaN count:
+------+-------+
|Depthm|R_Depth|
+------+-------+
|     0|      0|
+------+-------+

correlation: nan

Water density NaN count:
+------+-------+
|STheta|R_SIGMA|
+------+-------+
|     0|      0|
+------+-------+

correlation: nan

Silicate NaN count:
+------+------+
|SiO3uM|R_SIO3|
+------+------+
|     0|     0|
+------+------+

correlation: nan

Nitrite NaN count:
+-----+-----+
|NO2uM|R_NO2|
+-----+-----+
|    0|    0|
+-----+-----+

correlation: nan

Nitrate NaN count:
+-----+-----+
|NO3uM|R_NO3|
+-----+-----+
|    0|    0|
+-----+-----+

correlation: nan

Salinity NaN count:
+------+----------+
|Salnty|R_SALINITY|
+------+----------+
|     0|         0|
+------+----------+

correlation: nan

O2 Saturation NaN count:
+-----+-------+
|O2Sat|R_O2Sat|
+-----+-------+
|    0|      0|
+-----+-------+

correlation: nan

Oxygen NaN count:
+------+-----------+----+
|O2ml_L|Oxy_µmol/Kg|R_O2|
+------+-----------+----+
|     0|          0|   0|
+------+-----------+----+

1&

We now delete the columns with fewer null if they have a correlation coeffecients of 0.97 or higher.

In the case of a tie, we can refer to the pattern that is very apparent in no-tie cases, which is that the "reported" (columns starting with `R_`) have the higher non-null count. Therefore if two columns have equal null counts and high enough correlation, we delete the not-"reported" column.

In [18]:
''' Deleting columns that are duplicates of some other column '''
deleteList4 = ["Depthm","STheta","SiO3uM","NO2uM","NO3uM","Salnty","O2Sat","O2ml_L","Oxy_µmol/Kg","T_degC","R_POTEMP"]
bottle = bottle.drop(*deleteList4)

print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

There are now 37 columns and 0 rows
+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+-------+------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|RecInd|T_prec|S_prec|SThtaq|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+-------+------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+-------+------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+--

In [19]:
# tbd
getNullCounts(bottle).show()

+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+-------+------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|RecInd|T_prec|S_prec|SThtaq|NH3uM|C14As1|C14A1p|C14As2|C14A2p|DarkAs|DarkAp|MeanAs|MeanAp|LightP|R_Depth|R_TEMP|R_SALINITY|R_SIGMA|R_SVA|R_DYNHT|R_O2|R_O2Sat|R_SIO3|R_PO4|R_NO3|R_NO2|R_NH4|R_CHLA|R_PHAEO|R_PRES|R_SAMP|DIC1|DIC2|TA1|TA2|pH2|pH1|
+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+-------+------+----------+-------+-----+-------+----+-------+------+-----+-----+-----+-----+------+-------+------+------+----+----+---+---+---+---+
|     0|     0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     0|      0|     0|         0|      0|    0|      0|   0|      0|     0|    0|    0|    0|    0|     0|      0|     0|     0|   0|   0|  0|  0|  0|  0|
+------+------+-

#### Fill null values

In [20]:
''' Using the function from PA3 '''
from pyspark.ml.feature import Imputer

def fill_na(df, strategy):    
    imputer = Imputer(
        strategy=strategy,
        inputCols=df.columns, 
        outputCols=["{}_imputed".format(c) for c in df.columns]
    )
    
    new_df = imputer.fit(df).transform(df)
    
    ''' Select the newly created columns with all filled values '''
    new_df = new_df.select([c for c in new_df.columns if "imputed" in c])
    
    for col in new_df.columns:
        new_df = new_df.withColumnRenamed(col, col.split("_imputed")[0])
        
    return new_df

In [21]:
''' Filling in the remaining null rows with the mean of the column and saving this newly filled in dataframe as a new variable '''
bottleTest = fill_na(bottle, 'mean')

print("There are still", len(bottle.columns), "columns and", bottle.count(), "rows")
bottleTest.show(2, truncate=False)

Py4JJavaError: An error occurred while calling o1708.fit.
: org.apache.spark.SparkException: surrogate cannot be computed. All the values in RecInd,T_prec,S_prec,SThtaq,NH3uM,C14As1,C14A1p,C14As2,C14A2p,DarkAs,DarkAp,MeanAs,MeanAp,LightP,R_Depth,R_TEMP,R_SALINITY,R_SIGMA,R_SVA,R_DYNHT,R_O2,R_O2Sat,R_SIO3,R_PO4,R_NO3,R_NO2,R_NH4,R_CHLA,R_PHAEO,R_PRES,R_SAMP,DIC1,DIC2,TA1,TA2,pH2,pH1 are Null, Nan or missingValue(NaN)
	at org.apache.spark.ml.feature.Imputer.fit(Imputer.scala:198)
	at org.apache.spark.ml.feature.Imputer.fit(Imputer.scala:114)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
''' Double checking that all null values are filled '''
getNullCounts(bottleTest).show()

#### Lasso regression

> (my personal notes will indented)
> 1. tried to use Lasso in spark, turns out it's depricated, use LinearRegression instead
> 2. LinearRegression needs a label column (easy)
> 3. LinearRegression also needs a feature column (need help determining if this needs to be scaled before using lasso)
> 
> spark lasso: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.mllib.regression.LassoWithSGD.html<br>
Warning: **"Use [pyspark.ml.regression.LinearRegression](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.regression.LinearRegression.html) with elasticNetParam = 1.0. Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression."**
> 
> need to specify a label column in order to use. default is to use a column *named* "label" but it *is* possible to have the model look for a specific name. however, renaming columns is something we've done in past PAs and i wanna use it to be like "HEY-YO WE'RE USING WHAT YOU SHOWED US :D"
> 
> **(need help here:)** need to create "features" column first (did this in PA3, filled NaNs with mean before creating "features" column), but not sure if we need to scale it before feeding to lasso AND not sure what alpha "regParam" to use (i thiiiiink regParam == alpha ? 😅 i'm actually not sure, but see the raw-cell below for more about which regParam to use)


To better estimate what columns are most important to our model, we use a lasso regression. Because spark LassoWithSGD has depricated, we use LinearRegression with `elasticNetParam = 1.0` for Lasso regression equivalent.

In order to use this, `label` and `features` columns need to be specified. We rename `R_CHLA` to `label` and create a features column with VectorAssembler.

In [None]:
''' Rename the target chlorophyll column to 'label' '''
bottleTest = bottleTest.withColumnRenamed('R_CHLA', 'label')

In [None]:
''' Creating 'features' column '''
from pyspark.ml.feature import VectorAssembler

''' (interm step) make list of column names other than 'label,' AKA make a list of the features '''
features = bottleTest.columns
features.remove('label')

# bottleTest = VectorAssembler(outputCol="features_unscaled").setInputCols(features).transform(bottleTest)
bottleTest = VectorAssembler(outputCol="features").setInputCols(features).transform(bottleTest)

bottleTest.show(2, truncate=False)

In [None]:
''' Applying lasso regression where regParam= 0.0025 in place of alpha (?) '''
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(elasticNetParam = 1.0, regParam=0.0025, solver="normal", maxIter=1000, standardization=True)
model = lr.fit(bottleTest)

##### regParam (alpha?) tinkering

In [None]:
print("when regParams=" + str(lr.getRegParam()) + ", model.coefficients=")
model.coefficients

> okay i'm not sure how to adjust alpha like we would with a regular non-spark LassoRegression, but changing regParam did change which columns had nonzero coefficients which is what we did for regular Lasso
> 
> all of the model.coefficients align with the six columns `'R_SALINITY','R_DYNHT','R_O2','R_SIO3','R_NO2','R_PHAEO'` but otherwise include at least one more that doesn't appear in all other regParam tests
>
> | regParam | nonzero coeffs | S-prec? | R_TEMP? | R_SVA? | R_O2Sat? | R_NO3 |
> |----------|----------------|---------|---------|--------|----------|-------|
> | 0.0      |       17       |   Yes   |   Yes   |   Yes  |    Yes   |  Yes  |
> | 0.001    |       13       |   Yes   |   Yes   |        |    Yes   |  Yes  |
> | 0.002    |       10       |   Yes   |   Yes   |        |    Yes   |  Yes  |
> | 0.003    |       10       |   Yes   |   Yes   |        |    Yes   |  Yes  |
> | 0.005    |        9       |   Yes   |   Yes   |        |          |       |
> | 0.008    |        8       |         |   Yes   |        |          |       |
> | 0.010    |        7       |         |   Yes   |        |          |       |
> | 0.013    |        8       |         |   Yes   |   Yes  |          |       |
> | 0.015    |        8       |         |   Yes   |   Yes  |          |       |
> | 0.018    |        7       |         |         |   Yes  |          |       |
> | 0.020    |        7       |         |         |   Yes  |          |       |
>
> also it looks like scaling doesn't actually change much other than the absolute value of the coefficients

##### using the coeff list

> here are the "most useful" features. i'm not sure if i should be deleting the "useless" ones or just dropping nulls from the other columns

In [None]:
''' Find the "most useful" and "not useful" features, as according to the lasso regresion when regParam=0.0025 '''

coeff = model.coefficients
usefulFeatures = []
deleteList5 = []

print("(For-loop of", len(coeff), "iterations)")
for i in range(len(coeff)):
    if coeff[i] != 0:
        usefulFeatures.append(features[i])
    else:
        deleteList5.append(features[i])

print("When regParam="+str(lr.getRegParam())+", this lasso model indicates the most useful columns to predicting chlorophyll are:", usefulFeatures)

In [None]:
''' Moving back to dataframe before nulls were filled with means, drop the null rows of the "most useful" features '''
bottle = bottle.dropna(subset=usefulFeatures)

In [None]:
''' Moving back to dataframe before nulls were filled with means, drop the "not useful" features '''
bottle = bottle.drop(*deleteList5)

In [None]:
''' View the dimensions and two rows of the resulting dataframe '''
print("Moving back to dataframe before nulls were filled with means, here are now", len(bottle.columns), "columns and", bottle.count(), "rows")
bottle.show(2, truncate=False)

#### Make sure column values are the right type

In [None]:
''' Check the dtypes of each column '''
bottle.printSchema()

In [None]:
''' Row count, and comparison to original row count '''
print("There are now", len(bottle.columns), "columns and", bottle.count(), "rows (which is", maxRows-bottle.count(), "less than the original row count)")

#### Summary Statistics and graphs

In [None]:
''' See the detailed numerical description of the current dataframe '''
bottle.describe().show()

In [None]:
getNullCounts(bottle).show()

## Questions

#### Which factors are chlorophyll levels most dependent on?
#### Can we use these factors as features in a model that accurately predict chlorophyll levels in a marine ecosystem?
#### Can these few, simple, measurable factors we found indicate whether an ocean ecosystem is healthy and sustainable based on the chlorophyll abundance measured?
#### Can we predict chlorophyll abundance without having to measure it?

## Feature Selection and Target Cleaning

#### Distribution of data for cholorophyll levels, summary stats

In [None]:
bottle.agg(min('R_CHLA'), max('R_CHLA'), mean('R_CHLA'), stddev('R_CHLA'), count('R_CHLA'), skewness('R_CHLA')).show()

##### Looking at this summary data of our chlorophyll levels, the data is highly positively skewed. This is displayed in our skewness value but also the difference between the maximum value and the mean.

#### Define cholorophyll category levels: low, medium, high?

##### As displayed in Figure 4 of this article,https://www.nature.com/scitable/knowledge/library/the-biological-productivity-of-the-ocean-70631104/, the chlorophyll concentrations in the ocean can be split into three category levels, low, medium, and high. Our low category range will be between 0 > x > .1ug/l, medium range will be .1 >= x > 1ug/l, and high concentrations will be x >= 1ug/l.

#### Map cholorophyll levels to defined categories and output graphs/summary stats and Add target column with category label to dataframe

In [None]:
lowThreshold = .1
medThreshold = 1

In [None]:
bottle = bottle.withColumn(
    'Target',
     when((col("R_CHLA") < lowThreshold), 0)\
    .when((col("R_CHLA").between(lowThreshold, medThreshold)), 1)\
    .when((col("R_CHLA") > medThreshold), 2)\
    .otherwise(10)
)

In [None]:
bottle.groupby('Target').count().show()

#### PCA columns that are duplicates and analyse if we can get rid of null columns without removal of variance

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType

In [None]:
columns = ['col1', 'col2', 'correlation']
schema = StructType([
  StructField('col1', StringType(), False),
  StructField('col2', StringType(), False),
  StructField('correlation', IntegerType(), False)
  ])
highCorrs = spark.createDataFrame(spark.sparkContext.emptyRDD() ,schema)
rangeColumns = range(len(bottle.columns))
for i in rangeColumns:
    for j in rangeColumns:
        if(i != j):
            correlation = bottle.corr(bottle.columns[i], bottle.columns[j])
            if(correlation > .9):
                newCorr = spark.createDataFrame([(bottle.columns[i], bottle.columns[j], correlation)], columns)
                highCorrs = highCorrs.union(newCorr)
            else: 
                 continue;

In [None]:
highCorrs.show()

Because R_PO4 has 998 nulls and is highly correlated with R_NO3 and R_SIO3, we will drop this column with little reduction in variance. R_SVA also contains nulls and is highly correlated with R_Temp, so we will drop this column as well. 

In [None]:
bottle = bottle.drop('R_O2')
bottle.withColumnRenamed('R_O2Sat', 'R_O2, R_O2Sat')
bottle = bottle.drop('R_SIO3')
bottle.withColumnRenamed('R_NO3', 'R_SIO3, R_NO3')
bottle = bottle.drop('R_CHLA')
bottle.show(2)

## Models

#### Split data into 80/10/10 train, test, validation set

In [None]:
#Checking schema of data again. Using bottleTest df instead of 'bottle' because it has no null values and used vector assembler to create feature vector.
bottle.printSchema()

In [None]:
features = bottle.columns
features.remove('Target')
bottle = VectorAssembler(outputCol="features_unscaled").setInputCols(features).transform(bottle)

In [None]:
bottle.show(2, truncate=False)

In [None]:
seed = 42
train, test, validation = bottle.randomSplit([0.80, 0.10, 0.10], seed=seed)
print('Train dataset count:', train.count())
print('Test dataset count:', test.count())
print('Validation dataset count:', validation.count())

#### Scale Data

In [None]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(withMean=True, withStd=True, inputCol='features_unscaled', outputCol='features')
ss = standardScaler.fit(train)

In [None]:
print('StandardScaler Means:', ss.mean)
print('StandardScaler StDevs:', ss.std)

In [None]:
trainscaled = ss.transform(train)
testscaled = ss.transform(test)
valscaled = ss.transform(validation)

#### Check for Imblance in datasets

In [None]:
trainscaled.groupby('target').count().show()

In [None]:
testscaled.groupby('target').count().show()

In [None]:
valscaled.groupby('target').count().show()

The datasets are well balanced with proportional amounts of each feature in each set.

### Logistic Regresssion

#### Cross Validation, Check to see if data can be used to predict labels, Train and Validation Accuracy Scores

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Parameter Tuning
# regParams = [0.001, 0.002, 0.003, 0.004]
# elasticNetParams = [0.5, 0.6, 0.7]

# for r in regParams:
#     for e in elasticNetParams:
#         lr = LogisticRegression(featuresCol='features', labelCol='Target', predictionCol='prediction', maxIter=10, regParam=r, elasticNetParam=e)
#         lrModel = lr.fit(trainscaled)
#         print('regParam', r, 'elasticNetParam', e, 'accuracy', lrModel.summary.accuracy)

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='Target', predictionCol='prediction', \
                        maxIter=10, regParam=0.001, elasticNetParam=0.6)
lrModel = lr.fit(trainscaled)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))
    
print('-------------------------------------------')

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFalse Positive Rate: %s\nTrue Positive Rate: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [None]:
predictVal = lrModel.evaluate(valscaled)
accuracy = predictVal.accuracy
falsePositiveRate = predictVal.weightedFalsePositiveRate
truePositiveRate = predictVal.weightedTruePositiveRate
fMeasure = predictVal.weightedFMeasure()
precision = predictVal.weightedPrecision
recall = predictVal.weightedRecall

print("Accuracy: %s\nFalse Positive Rate: %s\nTrue Positive Rate: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [None]:
# Code for Logistic Regression with 5 fold Cross Validation doesn't work yet

# lr = LogisticRegression(featuresCol='features', labelCol='Target', predictionCol='Prediction')

# # lrModel = lr.fit(trainscaled)

# paramGrid = (ParamGridBuilder()
#              .addGrid(lr.regParam, [0.001, 0.01, 0.1, 1.0, 10.0])
#              .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
#              .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
#              .build())

# evaluator = MulticlassClassificationEvaluator(predictionCol='Prediction')

# cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=2)
# cvmodel = cv.fit(trainscaled)

# predictTrain=cvmodel.transform(trainscaled)
# predictVal=cvmodel.transform(valscaled)

# print("The area under ROC for train set is {}".format(evaluator.evaluate(predictTrain)))
# print("The area under ROC for validation set is {}".format(evaluator.evaluate(predictVal)))

### AdaBoost Decision Tree

#### Can we increase accuracy from previous steps? Can we figure out which features matter the most?

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
dt = DecisionTreeClassifier(labelCol="Target", seed=seed)
dtModel = dt.fit(trainscaled)
dtModel.setFeaturesCol("features")
trainResult = dtModel.transform(trainscaled)
valResult =  dtModel.transform(valscaled)
print(trainResult.columns)

In [None]:
def calculate_accuracy(df):
    correct = df.filter(df["Target"] == df["prediction"]).count()
    incorrect = df.filter(df["Target"] != df["prediction"]).count()
    totalRows = df.count()
    accuracy = correct/totalRows
    error = incorrect/totalRows
    print('Accuracy: ', accuracy)
    print('Error Rate: ', error)

In [None]:
calculate_accuracy(trainResult)
calculate_accuracy(valResult)

In [None]:
print("Model Depth:", dtModel.depth)
print("Model Number of Nodes:", dtModel.numNodes)

In [None]:
print('feature importance', dtModel.featureImportances)
print('feature col', dtModel.featuresCol)

# Stop spark session

In [None]:
#spark.stop()