<a href="https://colab.research.google.com/github/lihaolisa/BDHteam48project/blob/master/rf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
#reference medium.com/@rmache/big-data-with-spark-in-google-colab-7c046e24b3

In [0]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"



In [0]:
# Tools we need to connect to the Spark server, load our data,
# clean it and prepare it
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col
# Set up constants
mpdata = "/content/gdrive/My Drive/Colab/mp_data_6hr.csv" 
APP_NAME = "mp_data_6hr"
SPARK_URL = "local[*]"
RANDOM_SEED = 42
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 10
RF_MAX_DEPTH = 10
RF_NUM_BINS = 32

In [0]:
# Connect to the Spark server

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Load datasets

df = spark.read.options(header="true",inferschema = "true").csv(mpdata)

In [0]:
print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")

The shape is 49741 rows by 131 columns.


In [0]:
# make sure no null values
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)for c in df.columns]).toPandas().to_dict(orient='records')
print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")

We have 3304695 null values in this dataset.


In [0]:
columns_to_drop = ['subject_id', 'hadm_id', 'icustay_id', 'dbsource', 'expire_flag', 'deathtime_hours', 'hosp_deathtime_hours']
df = df.drop(*columns_to_drop)

In [0]:
df.show()

+--------------------+--------+------+--------------------+--------------+-----------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-------------+---------+----------+----------+------------+---------+--------+-----------+-------------+---------+----------+----------+------------+-----------------+--------+-----------+--------+-------------+--------------+------------+------------------+-------+------------+-------------+-----------+-----------------+-------+------------+-------------+-----------+-----------------+---------------+----------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------+------------+----------+----------------+--------------+------------------+---------------+-------------+------------------+---------------+------------------+-------------+------------------+------------------

In [0]:
impute_col = [c for c in df.columns if c not in {'hospital_expire_flag', 'age', 'gender', 'ethnicity', 'admission_type', 'icustay_num'}]
#impute_col
df_impute = df.select(impute_col)
for col_name in df_impute.columns:
    df_impute = df_impute.withColumn(col_name, col(col_name).cast('float'))
df_impute.show()

+--------------+----------+-----------+-----------+-------------+----------+---------+------------+-------------+---------+----------+----------+------------+---------+--------+-----------+-------------+---------+----------+----------+------------+---------+--------+-----------+--------+-------------+--------------+------------+------------------+-------+------------+-------------+-----------+-----------------+-------+------------+-------------+-----------+-----------------+---------------+----------------------+------------------+---------+---------+---------+------------------+-------------+-------------+------------+----------+----------------+--------------+------------+---------------+-------------+---------------+---------------+------------+-------------+--------------+--------+--------+-----------+--------+--------+--------------+---------------------+-----------------+-------+--------+------+-----------------+------------+------------+-----------+---------+---------------+----

In [0]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols= df_impute.columns, 
    outputCols= df_impute.columns
)
df_imputed = imputer.fit(df_impute).transform(df_impute)
df_imputed.show()

+--------------+----------+-----------+-----------+-------------+----------+---------+------------+-------------+---------+----------+----------+------------+---------+---------+-----------+-------------+---------+----------+----------+------------+---------+--------+-----------+---------+-------------+--------------+------------+------------------+---------+------------+-------------+-----------+-----------------+---------+------------+-------------+-----------+-----------------+---------------+----------------------+------------------+---------+---------+---------+------------------+-------------+-------------+------------+----------+----------------+--------------+------------+---------------+-------------+---------------+---------------+------------+-------------+--------------+---------+---------+-----------+---------+--------+--------------+---------------------+-----------------+---------+---------+--------+-----------------+------------+------------+-----------+---------+------

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
df_original = df.select('hospital_expire_flag', 'age', 'gender', 'ethnicity', 'admission_type', 'icustay_num')
df1 = df_original.withColumn("id", monotonically_increasing_id())
df2 = df_imputed.withColumn("id", monotonically_increasing_id())
df3 = df1.join(df2, "id", "outer").drop("id")

df3.show()

+--------------------+-------+------+--------------------+--------------+-----------+--------------+----------+-----------+-----------+-------------+----------+---------+------------+-------------+---------+----------+----------+------------+---------+---------+-----------+-------------+---------+----------+----------+------------+---------+--------+-----------+---------+-------------+--------------+------------+------------------+---------+------------+-------------+-----------+-----------------+---------+------------+-------------+-----------+-----------------+---------------+----------------------+------------------+---------+---------+---------+------------------+-------------+-------------+------------+----------+----------------+--------------+------------+---------------+-------------+---------------+---------------+------------+-------------+--------------+---------+---------+-----------+---------+--------+--------------+---------------------+-----------------+---------+-------

In [0]:
df = df3.na.drop()
print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")

The shape is 49741 rows by 124 columns.


In [0]:
features = [c for c in df.columns if c not in {'hospital_expire_flag'}]
df = VectorAssembler(inputCols=features, outputCol="features").transform(df)
df.select("hospital_expire_flag", "features").show(5)

IllegalArgumentException: ignored