In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("HDFS to Spark") \
        .getOrCreate()



In [4]:
import pyspark
import pandas as pd
from pyspark.sql import functions as F
from pyspark.ml.feature import Imputer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import mean,round

In [5]:
df  = spark.read.csv("hdfs://localhost:54310/user/healthcare/Liver_Patient.csv", header=True, inferSchema=True)

In [6]:
df.show()

+------+------------------+---------------------+---------------+----------------+-----------------------------+------------------------------+-------------------------------+--------------+------------+------------------------------------+---------+----------+
|    ID|Age of the patient|Gender of the patient|Total Bilirubin|Direct Bilirubin|�Alkphos Alkaline Phosphotase|�Sgpt Alamine Aminotransferase|Sgot Aspartate Aminotransferase|Total Protiens|�ALB Albumin|A/G Ratio Albumin and Globulin Ratio|City_Code|State_Code|
+------+------------------+---------------------+---------------+----------------+-----------------------------+------------------------------+-------------------------------+--------------+------------+------------------------------------+---------+----------+
|ID_001|                65|               Female|            0.7|             0.1|                          187|                            16|                             18|           6.8|         3.3|           

In [7]:
liverpatientdf = df.withColumnRenamed("Age of the patient", "age").withColumnRenamed("Gender of the patient", "gender").withColumnRenamed("�Alkphos Alkaline Phosphotase", "alkphos_alkaline_phosphotase").withColumnRenamed("�Sgpt Alamine Aminotransferase", "sgpt_alamine_aminotransferase").withColumnRenamed("�ALB Albumin", "alb_albumin")

In [8]:
print(liverpatientdf.columns)

['ID', 'age', 'gender', 'Total Bilirubin', 'Direct Bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'Sgot Aspartate Aminotransferase', 'Total Protiens', 'alb_albumin', 'A/G Ratio Albumin and Globulin Ratio', 'City_Code', 'State_Code']


In [9]:
liverpatientdf = liverpatientdf.withColumnRenamed("Total Bilirubin","total_bilirubin").withColumnRenamed("Direct Bilirubin","direct_bilirubin").withColumnRenamed("Total Protines","total_protiens").withColumnRenamed("Sgot Aspartate Aminotransfera","sgot_aspartate_aminotransfera").withColumnRenamed("A/G Ratio Albumin","a/g_ratio_albumin").withColumnRenamed("Globulin Ratio","globulin_ratio")

In [10]:
print(liverpatientdf.columns)

['ID', 'age', 'gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'Sgot Aspartate Aminotransferase', 'Total Protiens', 'alb_albumin', 'A/G Ratio Albumin and Globulin Ratio', 'City_Code', 'State_Code']


In [11]:
liverpatientdf = liverpatientdf.withColumnRenamed("Sgot Aspartate Aminotransferase","sgot_aspartate_aminotransferase").withColumnRenamed("Total Protiens","total_patients")

In [12]:
print(liverpatientdf.columns)

['ID', 'age', 'gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'A/G Ratio Albumin and Globulin Ratio', 'City_Code', 'State_Code']


In [13]:
liverpatientdf = liverpatientdf.withColumnRenamed("ID","id")

In [14]:
liverpatientdf = liverpatientdf.withColumnRenamed("A/G Ratio Albumin and Globulin Ratio","a/g_ratio_albumin_and_globulin_ratio")

In [15]:
print(liverpatientdf.columns)

['id', 'age', 'gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio', 'City_Code', 'State_Code']


In [16]:

liverpatientdf.toPandas()

Unnamed: 0,id,age,gender,total_bilirubin,direct_bilirubin,alkphos_alkaline_phosphotase,sgpt_alamine_aminotransferase,sgot_aspartate_aminotransferase,total_patients,alb_albumin,a/g_ratio_albumin_and_globulin_ratio,City_Code,State_Code
0,ID_001,65.0,Female,0.7,0.1,187.0,16.0,18.0,6.8,3.3,0.90,CT01,ST15
1,ID_002,62.0,Male,10.9,5.5,699.0,64.0,100.0,7.5,3.2,0.74,CT28,ST11
2,ID_003,62.0,Male,7.3,4.1,490.0,60.0,68.0,7.0,3.3,0.89,CT12,ST28
3,ID_004,58.0,Male,1.0,0.4,182.0,14.0,20.0,6.8,3.4,1.00,CT21,ST27
4,ID_005,72.0,Male,3.9,2.0,195.0,27.0,59.0,7.3,2.4,0.40,CT03,ST24
5,ID_006,46.0,Male,1.8,0.7,208.0,19.0,14.0,7.6,4.4,1.30,CT06,ST22
6,ID_007,26.0,Female,0.9,0.2,154.0,,12.0,7.0,3.5,1.00,CT06,ST22
7,ID_008,29.0,Female,0.9,0.3,202.0,14.0,11.0,6.7,3.6,1.10,CT02,ST15
8,ID_009,17.0,Male,0.9,0.3,202.0,22.0,19.0,7.4,4.1,1.20,CT26,ST15
9,ID_010,55.0,Male,0.7,0.2,290.0,53.0,58.0,6.8,3.4,1.00,CT10,ST12


In [17]:
liverpatientdf.select("id",'age',"total_bilirubin")

DataFrame[id: string, age: int, total_bilirubin: double]

In [18]:
("total row count "+str(liverpatientdf.count()))

'total row count 30677'

##### Checking null values

In [19]:

liverpatientdfnull = liverpatientdf.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in liverpatientdf.columns])

In [55]:
liverpatientdfnull.toPandas().T

Unnamed: 0,0
id,0
age,2
gender,898
total_bilirubin,644
direct_bilirubin,558
alkphos_alkaline_phosphotase,797
sgpt_alamine_aminotransferase,538
sgot_aspartate_aminotransferase,463
total_patients,464
alb_albumin,494


#### encoding 

In [21]:
print(liverpatientdf.columns)

['id', 'age', 'gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio', 'City_Code', 'State_Code']


In [22]:
liverpatientdf = liverpatientdf.withColumn("Gender", 
                                           F.when(liverpatientdf["Gender"] == "Male", 1)
                                           .otherwise(0))

In [23]:
liverpatientdf.select("gender").show()

+------+
|gender|
+------+
|     0|
|     1|
|     1|
|     1|
|     1|
|     1|
|     0|
|     0|
|     1|
|     1|
|     1|
|     1|
|     1|
|     0|
|     1|
|     1|
|     1|
|     1|
|     0|
|     0|
+------+
only showing top 20 rows



## info 

In [56]:
liverpatientdf.describe().toPandas().T

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,30675,,,ID_001,ID_9999
age,30675,44.1079706601467,15.981066421425584,4,90
Gender,30675,0.7163814180929096,0.45076125135706696,0,1
total_bilirubin,30031,3.3703672871366432,6.256591914605603,0.4,75.0
direct_bilirubin,30117,1.528080486103988,2.870101128279074,0.1,19.7
alkphos_alkaline_phosphotase,29878,289.06148336568714,238.57336073801036,63,2110
sgpt_alamine_aminotransferase,30137,81.48219796263729,182.1906845502062,10,2000
sgot_aspartate_aminotransferase,30212,111.44528664106977,280.9056025027297,10,4929
total_patients,30211,6.480341597431365,1.0821077253454834,2.7,9.6


In [25]:
liverpatientdf

DataFrame[id: string, age: int, Gender: int, total_bilirubin: double, direct_bilirubin: double, alkphos_alkaline_phosphotase: int, sgpt_alamine_aminotransferase: int, sgot_aspartate_aminotransferase: int, total_patients: double, alb_albumin: double, a/g_ratio_albumin_and_globulin_ratio: double, City_Code: string, State_Code: string]

In [26]:
column_types = liverpatientdf.dtypes
print(column_types)

[('id', 'string'), ('age', 'int'), ('Gender', 'int'), ('total_bilirubin', 'double'), ('direct_bilirubin', 'double'), ('alkphos_alkaline_phosphotase', 'int'), ('sgpt_alamine_aminotransferase', 'int'), ('sgot_aspartate_aminotransferase', 'int'), ('total_patients', 'double'), ('alb_albumin', 'double'), ('a/g_ratio_albumin_and_globulin_ratio', 'double'), ('City_Code', 'string'), ('State_Code', 'string')]


In [27]:
numerical_columns = [col_name for col_name,col_type in column_types if col_type in ["int","bigint","float","double"]]
numerical_df = liverpatientdf.select(*numerical_columns)
numerical_df.show()

+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
| 65|     0|            0.7|             0.1|                         187|                           16|                             18|           6.8|        3.3|                                 0.9|
| 62|     1|           10.9|             5.5|                         699|                           64|                            100|           7.5|        3.2|                                0

In [28]:
print(numerical_df.columns)

['age', 'Gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio']


In [29]:
# Define feature columns (excluding the column with missing values)
feature_columns = numerical_df.columns[1:]
print(feature_columns)

['Gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio']


In [30]:
df_trainingnull = numerical_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in numerical_df.columns])
df_trainingnull.show()

+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|  2|     0|            644|             558|                         797|                          538|                            463|           464|        494|                                 559|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+---------------------------------

In [31]:
numerical_df = numerical_df.na.drop(subset=['age'])

In [32]:
columns_to_fill = feature_columns

In [33]:
mean_values = numerical_df.agg(*(round(mean(c), 2).alias(c) for c in columns_to_fill))
mean_values_dict = mean_values.first().asDict()

In [34]:
for column in columns_to_fill:
    numerical_df = numerical_df.fillna(mean_values_dict[column], subset=[column])

In [35]:
numerical_df.show()

+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
| 65|     0|            0.7|             0.1|                         187|                           16|                             18|           6.8|        3.3|                                 0.9|
| 62|     1|           10.9|             5.5|                         699|                           64|                            100|           7.5|        3.2|                                0

In [36]:
numerical_null = numerical_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in numerical_df.columns])
numerical_null.show()

+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|  0|     0|              0|               0|                           0|                            0|                              0|             0|          0|                                   0|
+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+---------------------------------

In [37]:
liverpatientdf = liverpatientdf.na.drop(subset=['age'])
liverpatientdf.show()

+------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+---------+----------+
|    id|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|City_Code|State_Code|
+------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+---------+----------+
|ID_001| 65|     0|            0.7|             0.1|                         187|                           16|                             18|           6.8|        3.3|                                 0.9|     CT01|      ST15|
|ID_002| 62|     1|           10.9|             5.5|                         699|   

In [38]:
liverpatient_null = liverpatientdf.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in liverpatientdf.columns])
liverpatient_null.show()

+---+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+---------+----------+
| id|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|City_Code|State_Code|
+---+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+---------+----------+
|  0|  0|     0|            644|             558|                         797|                          538|                            463|           464|        494|                                 559|        0|         0|
+---+---+------+---------------+----------------+----------------------------+------------------

In [39]:
string_columns = [col_name for col_name,col_type in column_types if col_type in ["string"]]
string_df = liverpatientdf.select(*string_columns)
string_df.show()

+------+---------+----------+
|    id|City_Code|State_Code|
+------+---------+----------+
|ID_001|     CT01|      ST15|
|ID_002|     CT28|      ST11|
|ID_003|     CT12|      ST28|
|ID_004|     CT21|      ST27|
|ID_005|     CT03|      ST24|
|ID_006|     CT06|      ST22|
|ID_007|     CT06|      ST22|
|ID_008|     CT02|      ST15|
|ID_009|     CT26|      ST15|
|ID_010|     CT10|      ST12|
|ID_011|     CT21|      ST27|
|ID_012|     CT25|      ST01|
|ID_013|     CT04|      ST24|
|ID_014|     CT20|      ST28|
|ID_015|     CT16|      ST23|
|ID_016|     CT07|      ST13|
|ID_017|     CT01|      ST15|
|ID_018|     CT17|      ST07|
|ID_019|     CT09|      ST34|
|ID_020|     CT19|      ST12|
+------+---------+----------+
only showing top 20 rows



In [40]:
string_null = string_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in string_df.columns])
string_null.show()

+---+---------+----------+
| id|City_Code|State_Code|
+---+---------+----------+
|  0|        0|         0|
+---+---------+----------+



In [41]:
merged_df = string_df.crossJoin(numerical_df)

In [42]:
merged_df.show()

+------+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|    id|City_Code|State_Code|age|Gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+------+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|ID_001|     CT01|      ST15| 65|     0|            0.7|             0.1|                         187|                           16|                             18|           6.8|        3.3|                                 0.9|
|ID_001|     CT01|      ST15| 62|     1|           10.9|             5.5|           

In [43]:
#merged_df.write.format("csv").option("header", "true").mode("overwrite").save("merge_df")

In [44]:
print(merged_df.columns)

['id', 'City_Code', 'State_Code', 'age', 'Gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_patients', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio']


In [52]:
merged_df = merged_df.withColumnRenamed("City_Code","city_code").withColumnRenamed("State_Code","state_code").withColumnRenamed("Gender","gender").withColumnRenamed("total_patients","total_protiens")

In [57]:
print(merged_df.columns)

['id', 'city_code', 'state_code', 'age', 'gender', 'total_bilirubin', 'direct_bilirubin', 'alkphos_alkaline_phosphotase', 'sgpt_alamine_aminotransferase', 'sgot_aspartate_aminotransferase', 'total_protiens', 'alb_albumin', 'a/g_ratio_albumin_and_globulin_ratio']


In [51]:
merged_df.show()

+------+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|    id|city_code|state_code|age|gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_patients|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+------+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|ID_001|     CT01|      ST15| 65|     0|            0.7|             0.1|                         187|                           16|                             18|           6.8|        3.3|                                 0.9|
|ID_001|     CT01|      ST15| 62|     1|           10.9|             5.5|           

In [59]:

merged_null = merged_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in merged_df.columns])
merged_null.show()

+---+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
| id|city_code|state_code|age|gender|total_bilirubin|direct_bilirubin|alkphos_alkaline_phosphotase|sgpt_alamine_aminotransferase|sgot_aspartate_aminotransferase|total_protiens|alb_albumin|a/g_ratio_albumin_and_globulin_ratio|
+---+---------+----------+---+------+---------------+----------------+----------------------------+-----------------------------+-------------------------------+--------------+-----------+------------------------------------+
|  0|        0|         0|  0|     0|              0|               0|                           0|                            0|                              0|             0|          0|                                   0|
+---+---------+----------+---+------+---------------+----------------+--------------------------

In [61]:
merged_null.toPandas().T

Unnamed: 0,0
id,0
city_code,0
state_code,0
age,0
gender,0
total_bilirubin,0
direct_bilirubin,0
alkphos_alkaline_phosphotase,0
sgpt_alamine_aminotransferase,0
sgot_aspartate_aminotransferase,0
