In [1]:
# Installing pyspark on the current Colab session

!pip install pyspark



In [2]:
# Importing libraries

from google.colab import drive
from pyspark.sql import SparkSession
import os

In [3]:
# Creating Apache Spark session

spark = SparkSession.builder.appName('TRI').getOrCreate()

In [4]:
spark

In [5]:
# Mounting the drive

drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
# Navigating to my working directory

%cd "/content/drive/My Drive/pyspark"

/content/drive/My Drive/pyspark


In [7]:
# Checking the files/folders inside my current working directory

!ls

pyspark.ipynb  US_1a_2018.csv


In [8]:
# Saving the current working directory path into a variable for later use

dir_path = os.getcwd()

In [9]:
# Reading the TRI 2018 File 1a

df_news = spark.read.csv(f'{dir_path}/US_1a_2018.csv',
                         header=True,
                         sep=',',
                         inferSchema=True)

In [10]:
# Showing the first 20 rows

df_news.show()

+---------+--------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+--------+----------+--------------------+--------------------+----------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+-------------------+------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+--------

In [11]:
# Checking the type of object saved into the df_news variable

type(df_news)

pyspark.sql.dataframe.DataFrame

In [12]:
# Depicting the first 3 records

df_news.head(3)

[Row(FORM TYPE='R', REPORTING YEAR='2018', TRADE SECRET INDICATOR='NO', SANITIZED INDICATOR='NO', TITLE OF CERTIFYING OFFICIAL='OWNER/ PRESIDENT', NAME OF CERTIFYING OFFICIAL='GARY WADE', CERTIFYING OFFICIAL'S SIGNATURE INDICATOR='ELECTRONIC', DATE SIGNED='2019-06-19', TRIFID='47421MCMTLRR17P', FACILITY NAME='RECYCLING SERVICES OF INDIANA', FACILITY STREET='4635 PEERLESS RD', FACILITY CITY='BEDFORD', FACILITY COUNTY='LAWRENCE', FACILITY STATE='IN', FACILITY ZIP CODE=47421, BIA CODE=None, TRIBE NAME=None, MAILING NAME='RECYCLING SERVICES OF INDIANA', MAILING STREET='4635 PEERLESS RD', MAILING CITY='BEDFORD', MAILING STATE='IN', MAILING PROVINCE=None, MAILING ZIP CODE='47421', ENTIRE FACILITY IND='YES', PARTIAL FACILITY IND='NO', FEDERAL FACILITY IND='NO', GOCO FACILITY IND='NO', ASSIGNED FED FACILITY FLAG='NO', PUBLIC CONTACT NAME='RACHAEL CHASTAIN', PUBLIC CONTACT PHONE='8122798114', PUBLIC CONTACT PHONE EXT=None, PUBLIC CONTACT EMAIL='RCHASTAIN@RSIONLINE.ORG', PRIMARY SIC CODE=None, S

In [13]:
# Printing the schema

df_news.printSchema()

root
 |-- FORM TYPE: string (nullable = true)
 |-- REPORTING YEAR: string (nullable = true)
 |-- TRADE SECRET INDICATOR: string (nullable = true)
 |-- SANITIZED INDICATOR: string (nullable = true)
 |-- TITLE OF CERTIFYING OFFICIAL: string (nullable = true)
 |-- NAME OF CERTIFYING OFFICIAL: string (nullable = true)
 |-- CERTIFYING OFFICIAL'S SIGNATURE INDICATOR: string (nullable = true)
 |-- DATE SIGNED: string (nullable = true)
 |-- TRIFID: string (nullable = true)
 |-- FACILITY NAME: string (nullable = true)
 |-- FACILITY STREET: string (nullable = true)
 |-- FACILITY CITY: string (nullable = true)
 |-- FACILITY COUNTY: string (nullable = true)
 |-- FACILITY STATE: string (nullable = true)
 |-- FACILITY ZIP CODE: integer (nullable = true)
 |-- BIA CODE: double (nullable = true)
 |-- TRIBE NAME: string (nullable = true)
 |-- MAILING NAME: string (nullable = true)
 |-- MAILING STREET: string (nullable = true)
 |-- MAILING CITY: string (nullable = true)
 |-- MAILING STATE: string (nullab

In [14]:
# Getting column names

df_news.columns

['FORM TYPE',
 'REPORTING YEAR',
 'TRADE SECRET INDICATOR',
 'SANITIZED INDICATOR',
 'TITLE OF CERTIFYING OFFICIAL',
 'NAME OF CERTIFYING OFFICIAL',
 "CERTIFYING OFFICIAL'S SIGNATURE INDICATOR",
 'DATE SIGNED',
 'TRIFID',
 'FACILITY NAME',
 'FACILITY STREET',
 'FACILITY CITY',
 'FACILITY COUNTY',
 'FACILITY STATE',
 'FACILITY ZIP CODE',
 'BIA CODE',
 'TRIBE NAME',
 'MAILING NAME',
 'MAILING STREET',
 'MAILING CITY',
 'MAILING STATE',
 'MAILING PROVINCE',
 'MAILING ZIP CODE',
 'ENTIRE FACILITY IND',
 'PARTIAL FACILITY IND',
 'FEDERAL FACILITY IND',
 'GOCO FACILITY IND',
 'ASSIGNED FED FACILITY FLAG',
 'PUBLIC CONTACT NAME',
 'PUBLIC CONTACT PHONE',
 'PUBLIC CONTACT PHONE EXT',
 'PUBLIC CONTACT EMAIL',
 'PRIMARY SIC CODE',
 'SIC CODE 2',
 'SIC CODE 3',
 'SIC CODE 4',
 'SIC CODE 5',
 'SIC CODE 6',
 'NAICS ORIGIN',
 'PRIMARY NAICS CODE',
 'NAICS CODE 2',
 'NAICS CODE 3',
 'NAICS CODE 4',
 'NAICS CODE 5',
 'NAICS CODE 6',
 'LATITUDE',
 'LONGITUDE',
 'D and B NR A',
 'D and B NR B',
 'RCRA N

In [15]:
# Selecting some columns

df_news.select(['TRIFID', 'CAS NUMBER', 'CHEMICAL NAME', 'MAXIMUM AMOUNT ON-SITE']).show(3)

+---------------+----------+--------------------+----------------------+
|         TRIFID|CAS NUMBER|       CHEMICAL NAME|MAXIMUM AMOUNT ON-SITE|
+---------------+----------+--------------------+----------------------+
|47421MCMTLRR17P|      N150|DIOXIN AND DIOXIN...|                  13.0|
|90670PGPND13429| 007697372|         NITRIC ACID|                   4.0|
|46201MTCHL1841L| 007440473|            CHROMIUM|                   4.0|
+---------------+----------+--------------------+----------------------+
only showing top 3 rows



In [16]:
# Checking the data type for each column

df_news.dtypes

[('FORM TYPE', 'string'),
 ('REPORTING YEAR', 'string'),
 ('TRADE SECRET INDICATOR', 'string'),
 ('SANITIZED INDICATOR', 'string'),
 ('TITLE OF CERTIFYING OFFICIAL', 'string'),
 ('NAME OF CERTIFYING OFFICIAL', 'string'),
 ("CERTIFYING OFFICIAL'S SIGNATURE INDICATOR", 'string'),
 ('DATE SIGNED', 'string'),
 ('TRIFID', 'string'),
 ('FACILITY NAME', 'string'),
 ('FACILITY STREET', 'string'),
 ('FACILITY CITY', 'string'),
 ('FACILITY COUNTY', 'string'),
 ('FACILITY STATE', 'string'),
 ('FACILITY ZIP CODE', 'int'),
 ('BIA CODE', 'double'),
 ('TRIBE NAME', 'string'),
 ('MAILING NAME', 'string'),
 ('MAILING STREET', 'string'),
 ('MAILING CITY', 'string'),
 ('MAILING STATE', 'string'),
 ('MAILING PROVINCE', 'string'),
 ('MAILING ZIP CODE', 'string'),
 ('ENTIRE FACILITY IND', 'string'),
 ('PARTIAL FACILITY IND', 'string'),
 ('FEDERAL FACILITY IND', 'string'),
 ('GOCO FACILITY IND', 'string'),
 ('ASSIGNED FED FACILITY FLAG', 'string'),
 ('PUBLIC CONTACT NAME', 'string'),
 ('PUBLIC CONTACT PHONE'

In [17]:
# Obtaining statistics for the columns

df_news.describe().show()

+-------+--------------------+------------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+-------------+---------------+--------------+------------------+------------------+--------------------+--------------------+--------------------+------------+-------------+----------------+------------------+-------------------+--------------------+--------------------+-----------------+--------------------------+--------------------+--------------------+------------------------+--------------------+--------------------+----------+----------+----------+----------+----------+------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-------------------+------------------+-------------------+--------------------+------------+------------+------------+------------+---

In [18]:
# Adding column where ON-SITE - TREATED > 1 units/yr

df_news.withColumn('ON-SITE - TREATED > 1 units/yr', df_news['ON-SITE - TREATED'] > 1).show()

+---------+--------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+--------+----------+--------------------+--------------------+----------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+-------------------+------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+--------

In [19]:
# Dropping columns

df_news.drop('SANITIZED INDICATOR').show()

+---------+--------------+----------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+--------+----------+--------------------+--------------------+----------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+-------------------+------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+------

In [20]:
# Renaming column

df_news.withColumnRenamed('FORM TYPE', 'FORM').show(1)

+----+--------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+----------------+-------------+---------------+--------------+-----------------+--------+----------+--------------------+----------------+------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+---------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+-----

In [21]:
# Dropping rows based on recycling methods reported by facilities. Only the rows with at least one not null recycling method value are kept

df_news.na.drop(how="any",
                thresh=1,
                subset=['ON-SITE RECYCLING PROCESSES METHOD 1',
                        'ON-SITE RECYCLING PROCESSES METHOD 2',
                        'ON-SITE RECYCLING PROCESSES METHOD 3',
                        'ON-SITE RECYCLING PROCESSES METHOD 4',
                        'ON-SITE RECYCLING PROCESSES METHOD 5',
                        'ON-SITE RECYCLING PROCESSES METHOD 6',
                        'ON-SITE RECYCLING PROCESSES METHOD 7']
                ).show()

+---------+--------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+----------------+---------------+--------------+-----------------+--------+----------+--------------------+--------------------+----------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+-------------------+------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+--

In [22]:
# Filling out null values for column ON-SITE RECYCLING PROCESSES METHOD 1 

df_news.na.fill('No record', ['ON-SITE RECYCLING PROCESSES METHOD 1']).select('ON-SITE RECYCLING PROCESSES METHOD 1').show()

+------------------------------------+
|ON-SITE RECYCLING PROCESSES METHOD 1|
+------------------------------------+
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                                 H10|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
|                           No record|
+------------------------------------+
only showing top 20 rows



In [23]:
# Imputing

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['ON-SITE - RECYCLED', 'ON-SITE - TREATED', 'ON-SITE - ENERGY RECOVERY'],
    outputCols=[f"{col}_imputed" for col in ['ON-SITE - RECYCLED', 'ON-SITE - TREATED', 'ON-SITE - ENERGY RECOVERY']]
).setStrategy("mean")

In [24]:
# Adding imputation columns

imputer.fit(df_news).transform(df_news).show()

+---------+--------------+----------------------+-------------------+----------------------------+---------------------------+-----------------------------------------+-----------+---------------+--------------------+--------------------+----------------+--------------------+--------------+-----------------+--------+----------+--------------------+--------------------+----------------+-------------+----------------+----------------+-------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------------+------------------------+--------------------+----------------+----------+----------+----------+----------+----------+------------+------------------+------------+------------+------------+------------+------------+------------------+-------------------+------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+--------

In [25]:
# Filtering

df_news.filter((df_news['ON-SITE - RECYCLED'] != 0) |
               (df_news['ON-SITE - TREATED'] != 0) |
               (df_news['ON-SITE - ENERGY RECOVERY'] != 0)).select(['ON-SITE - RECYCLED', 'ON-SITE - TREATED', 'ON-SITE - ENERGY RECOVERY']).show()

+------------------+-----------------+-------------------------+
|ON-SITE - RECYCLED|ON-SITE - TREATED|ON-SITE - ENERGY RECOVERY|
+------------------+-----------------+-------------------------+
|               0.0|          14027.0|                      0.0|
|               0.0|           357.06|                      0.0|
|               0.0|          56541.0|                      0.0|
|               0.0|          7065.38|                      0.0|
|               0.0|        1696032.0|                      0.0|
|               0.0|        242633.13|                      0.0|
|               0.0|         282951.0|                      0.0|
|               0.0|        2773224.0|                3430455.0|
|               0.0|           4900.0|                      0.0|
|               0.0|        4000000.0|                      0.0|
|             320.0|            157.0|                      0.0|
|               0.0|           2337.0|                      0.0|
|          172000.0|     

In [26]:
# Group by TRIFID and CAS NUMBER and obtain total for RECYCLED, TREATED, and ENERGY RECOVERY

df_news.select(['ON-SITE - RECYCLED', 'ON-SITE - TREATED', 'ON-SITE - ENERGY RECOVERY', 'TRIFID', 'CAS NUMBER'])\
        .groupBy(['TRIFID', 'CAS NUMBER']).sum().show()

+---------------+----------+-----------------------+----------------------+------------------------------+
|         TRIFID|CAS NUMBER|sum(ON-SITE - RECYCLED)|sum(ON-SITE - TREATED)|sum(ON-SITE - ENERGY RECOVERY)|
+---------------+----------+-----------------------+----------------------+------------------------------+
|41096GLLTNUS42W| 000554132|                    0.0|                   0.0|                           0.0|
|31647MCRFLROUTE| 060207901|                    0.0|                   0.0|                           0.0|
|55021KGMNF226PA| 000079005|                    0.0|                   0.0|                           0.0|
|37405SGNLM1201S| 007429905|              2473066.0|                   0.0|                           0.0|
|30680STPNCOLDHI| 000091203|                    0.0|                   0.0|                           0.0|
|15601LSC00RD3BO| 007440508|                    0.0|                   0.0|                           0.0|
|60178SYMRF917CR| 000110543|         

# Simple ML example with PySpark

In [139]:
# Importing some libraries

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Window
import pyspark.sql.functions as f

In [115]:
# Selecting columns

df_ml = df_news.select(['ON-SITE - RECYCLED', 'CLASSIFICATION', 'PRIMARY NAICS CODE', 'METAL INDICATOR', 'MAXIMUM AMOUNT ON-SITE'])
df_ml.show(5)

+------------------+--------------+------------------+---------------+----------------------+
|ON-SITE - RECYCLED|CLASSIFICATION|PRIMARY NAICS CODE|METAL INDICATOR|MAXIMUM AMOUNT ON-SITE|
+------------------+--------------+------------------+---------------+----------------------+
|               0.0|        DIOXIN|            331314|             NO|                  13.0|
|               0.0|           TRI|            331492|             NO|                   4.0|
|               0.0|           TRI|            332710|            YES|                   4.0|
|               0.0|           PBT|            424710|             NO|                   3.0|
|               0.0|           TRI|            221112|            YES|                   3.0|
+------------------+--------------+------------------+---------------+----------------------+
only showing top 5 rows



In [116]:
# Keeping only records with recycling different to 0

df_ml = df_ml.filter(df_ml['ON-SITE - RECYCLED'] != 0)
df_ml = df_ml.na.drop()
df_ml = df_ml.dropDuplicates()

In [117]:
w = Window.partitionBy(['CLASSIFICATION', 'PRIMARY NAICS CODE', 'METAL INDICATOR', 'MAXIMUM AMOUNT ON-SITE'])

df_ml = df_ml.withColumn('maxRECYCLED', f.max('ON-SITE - RECYCLED').over(w))\
    .where(f.col('ON-SITE - RECYCLED') == f.col('maxRECYCLED'))\
    .drop('maxRECYCLED')

In [118]:
df_ml.show(5)

+------------------+--------------+------------------+---------------+----------------------+
|ON-SITE - RECYCLED|CLASSIFICATION|PRIMARY NAICS CODE|METAL INDICATOR|MAXIMUM AMOUNT ON-SITE|
+------------------+--------------+------------------+---------------+----------------------+
|      5.49422408E7|           PBT|            331492|            YES|                   9.0|
|          315000.0|           TRI|            325130|             NO|                   6.0|
|             276.0|           PBT|            326199|             NO|                   2.0|
|           27429.0|           TRI|            331491|            YES|                   4.0|
|            1536.0|           PBT|            332813|            YES|                   2.0|
+------------------+--------------+------------------+---------------+----------------------+
only showing top 5 rows



In [119]:
# Droping DIOXIN for the example

df_ml = df_ml.filter(df_ml['CLASSIFICATION'] != 'DIOXIN')
df_ml.show(5)

+------------------+--------------+------------------+---------------+----------------------+
|ON-SITE - RECYCLED|CLASSIFICATION|PRIMARY NAICS CODE|METAL INDICATOR|MAXIMUM AMOUNT ON-SITE|
+------------------+--------------+------------------+---------------+----------------------+
|      5.49422408E7|           PBT|            331492|            YES|                   9.0|
|          315000.0|           TRI|            325130|             NO|                   6.0|
|             276.0|           PBT|            326199|             NO|                   2.0|
|           27429.0|           TRI|            331491|            YES|                   4.0|
|            1536.0|           PBT|            332813|            YES|                   2.0|
+------------------+--------------+------------------+---------------+----------------------+
only showing top 5 rows



In [120]:
features = ['CLASSIFICATION', 'PRIMARY NAICS CODE', 'METAL INDICATOR', 'MAXIMUM AMOUNT ON-SITE']
stages = []
for feature in features:
  indexer = StringIndexer(inputCol=feature, outputCol=f'{feature}Index')
  stages.append(indexer)
  encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=f'{feature}Vec')
  stages.append(encoder)
pipeline = Pipeline(stages=stages)
df_ml_t = pipeline.fit(df_ml).transform(df_ml)

In [121]:
df_ml_t.show()

+------------------+--------------+------------------+---------------+----------------------+-------------------+-----------------+-----------------------+---------------------+--------------------+------------------+---------------------------+-------------------------+
|ON-SITE - RECYCLED|CLASSIFICATION|PRIMARY NAICS CODE|METAL INDICATOR|MAXIMUM AMOUNT ON-SITE|CLASSIFICATIONIndex|CLASSIFICATIONVec|PRIMARY NAICS CODEIndex|PRIMARY NAICS CODEVec|METAL INDICATORIndex|METAL INDICATORVec|MAXIMUM AMOUNT ON-SITEIndex|MAXIMUM AMOUNT ON-SITEVec|
+------------------+--------------+------------------+---------------+----------------------+-------------------+-----------------+-----------------------+---------------------+--------------------+------------------+---------------------------+-------------------------+
|      5.49422408E7|           PBT|            331492|            YES|                   9.0|                1.0|        (1,[],[])|                    3.0|      (210,[3],[1.0])|       

In [122]:
featureassembler = VectorAssembler(inputCols=[f'{feature}Vec' for feature in features],
                                   outputCol='Independent_Features')

In [123]:
output=featureassembler.transform(df_ml_t)

In [124]:
output.show()

+------------------+--------------+------------------+---------------+----------------------+-------------------+-----------------+-----------------------+---------------------+--------------------+------------------+---------------------------+-------------------------+--------------------+
|ON-SITE - RECYCLED|CLASSIFICATION|PRIMARY NAICS CODE|METAL INDICATOR|MAXIMUM AMOUNT ON-SITE|CLASSIFICATIONIndex|CLASSIFICATIONVec|PRIMARY NAICS CODEIndex|PRIMARY NAICS CODEVec|METAL INDICATORIndex|METAL INDICATORVec|MAXIMUM AMOUNT ON-SITEIndex|MAXIMUM AMOUNT ON-SITEVec|Independent_Features|
+------------------+--------------+------------------+---------------+----------------------+-------------------+-----------------+-----------------------+---------------------+--------------------+------------------+---------------------------+-------------------------+--------------------+
|      5.49422408E7|           PBT|            331492|            YES|                   9.0|                1.0|        

In [125]:
finalized_data = output.select(['ON-SITE - RECYCLED', 'Independent_Features'])

In [126]:
finalized_data.show()

+------------------+--------------------+
|ON-SITE - RECYCLED|Independent_Features|
+------------------+--------------------+
|      5.49422408E7|(222,[4,211,220],...|
|          315000.0|(222,[0,66,216],[...|
|             276.0|(222,[26,215],[1....|
|           27429.0|(222,[0,81,211,21...|
|            1536.0|(222,[22,211,215]...|
|            195.92|(222,[142,211,213...|
|       1.7386151E7|(222,[42,211,216]...|
|             791.0|(222,[0,162,214],...|
|         5414375.0|(222,[0,1,216],[1...|
|       2.1193039E7|(222,[0,66,214],[...|
|           16020.0|(222,[0,128,213],...|
|            1000.0|(222,[112,211,212...|
|         9525697.0|(222,[0,54,212],[...|
|         3753966.0|(222,[0,14,211,21...|
|          177004.0|(222,[0,11,211,21...|
|           99549.0|(222,[0,34,214],[...|
|         9135202.0|(222,[0,45,214],[...|
|       5.9938446E7|(222,[0,25,220],[...|
|           322.466|(222,[0,29,212],[...|
|            1469.0|(222,[0,75,215],[...|
+------------------+--------------

In [127]:
train_data,test_data=finalized_data.randomSplit([0.75, 0.25])

In [144]:
# Decision tree regression

regressor = DecisionTreeRegressor(featuresCol='Independent_Features', labelCol='ON-SITE - RECYCLED')
regressor=regressor.fit(train_data)
predictions = regressor.transform(test_data)
evaluator = RegressionEvaluator(labelCol='ON-SITE - RECYCLED',
                                predictionCol='prediction',
                                metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 6.12697e+07


In [143]:
# Random forest regression

regressor_rfr = RandomForestRegressor(featuresCol='Independent_Features', labelCol='ON-SITE - RECYCLED')
regressor_rfr=regressor_rfr.fit(train_data)
predictions_rfr = regressor_rfr.transform(test_data)
evaluator_rfr = RegressionEvaluator(labelCol='ON-SITE - RECYCLED',
                                predictionCol='prediction',
                                metricName='rmse')
rmse_rfr = evaluator_rfr.evaluate(predictions_rfr)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rfr)

Root Mean Squared Error (RMSE) on test data = 6.3327e+07
