In [1]:
import warnings
warnings.filterwarnings('ignore')

from hdfs import InsecureClient
import json
import re
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession, DataFrame
from pyspark.conf import SparkConf
from functools import reduce

# Initializing Spark Session object

In [2]:
class Config:

    def __init__(self,
                elasticsearch_host,
                elasticsearch_port,
                elasticsearch_auth_user,
                elasticsearch_auth_pass,
                hdfs_datanode_host,
                hdfs_user):
        self.elasticsearch_conf = {
            'es.nodes': elasticsearch_host,
            'es.port': elasticsearch_port,
            'es.net.http.auth.user': elasticsearch_auth_user,
            'es.net.http.auth.pass': elasticsearch_auth_pass
        }
        self.hdfs_datanode_host = hdfs_datanode_host
        self.hdfs_user = hdfs_user
        

    def _initialize_spark_session(self,appName):
        spark_app = (SparkSession
                    .builder
                    .appName(appName)
                    .config("spark.jars","/home/jovyan/spark_conf/elasticsearch-hadoop-7.12.0/dist/elasticsearch-hadoop-7.12.0.jar")
                    .config("spark.driver.extraClassPath","/home/jovyan/spark_conf/elasticsearch-hadoop-7.12.0/dist/elasticsearch-hadoop-7.12.0.jar")
                    .getOrCreate())
        return spark_app

In [3]:
conf_options = {
    "spark.jars": "/home/jovyan/spark_conf/elasticsearch-hadoop-7.12.0/dist/elasticsearch-spark-20_2.11-7.12.0.jar",
    "spark.es.nodes": "elasticsearch",
    "spark.es.port": "9200"
}

In [4]:
config = Config(elasticsearch_host="elasticsearch",
            elasticsearch_port="9200",
            elasticsearch_auth_user="elastic",
            elasticsearch_auth_pass="elastic@1234",
            hdfs_datanode_host="hdfs://hadoop-filebeat:8020",
            hdfs_user="nifi")
spark = config._initialize_spark_session("SparkApplication")

# Extracting all files

In [5]:
client = InsecureClient("http://hadoop-filebeat:50070",user="nifi")
client.list('/dfs-data')

['05078bb9-c94a-4ae7-b0a1-0228aaff6353',
 '062727ce-119a-425b-b1b1-1c1c9cb7d0d0',
 '1a9a5114-62c8-402c-aa72-e76c3290afcd',
 '2874a389-ca57-4433-862d-cfc3f7f8bf4c',
 '369526be-1536-48e7-84fa-931d4be6bae7',
 '36c34784-0f24-44ae-9954-09ed6ec16ffb',
 '3a5fd590-5171-4e91-90f7-f6e20adb10e8',
 '3c6a0470-6fb3-45dc-b8a9-e1828750e92d',
 '423814b2-265b-4009-afdf-0b325b31caa0',
 '4a389804-517e-40f9-81a5-f9780554a06d',
 '53a7bdbc-381a-491c-98aa-5028834dce06',
 '59527149-639c-466b-bc10-aa787693e7b5',
 '904b3ecf-6b7b-4902-90d8-bfc4a897a55b',
 '970b76be-4c02-4d8f-bc4d-7029c333d018',
 '9f3776bf-cf16-486e-b0af-dd771c566f29',
 'b92e7567-5745-4d19-b590-1a091dc536b3',
 'ba367c79-7a5a-482f-a0dd-572b36b27b2f',
 'eca740eb-7fc6-4b0e-87f8-f48fb6017831',
 'eeba93c4-6d7c-4eec-a81b-7895e3b1792f',
 'fc0b8975-e474-44b4-876a-0729d9391bd4',
 'fe0180ac-a70d-4d35-9943-a360790d0a48',
 'fe7b1347-ea0e-4803-80e7-b09add6a02f9']

In [6]:
def shape(dataframe: pyspark.sql.DataFrame):
    return dataframe.count(), len(dataframe.columns)

def fetch_and_merge_files(path,config):
    """
        Generator Function to fetch files stored in the hdfs file system and lazily evaluate them when 
        concatenating them
    """
    directory_path = f"{config.hdfs_datanode_host}/{path}"
    

    def get_files(path):
        nonlocal directory_path
        for file in client.list(path):
            print(f"Processing file: {file}")
            yield spark.read.format("csv").option("delimiter",",").option("header","true").load(f"{directory_path}/{file}")
    
    def unionAll(*dfs):
        return reduce(DataFrame.unionAll,dfs)
    
    return unionAll(*(file for file in get_files(path)))

In [7]:
merged_file = fetch_and_merge_files("/dfs-data",config)

Processing file: 05078bb9-c94a-4ae7-b0a1-0228aaff6353
Processing file: 062727ce-119a-425b-b1b1-1c1c9cb7d0d0
Processing file: 1a9a5114-62c8-402c-aa72-e76c3290afcd
Processing file: 2874a389-ca57-4433-862d-cfc3f7f8bf4c
Processing file: 369526be-1536-48e7-84fa-931d4be6bae7
Processing file: 36c34784-0f24-44ae-9954-09ed6ec16ffb
Processing file: 3a5fd590-5171-4e91-90f7-f6e20adb10e8
Processing file: 3c6a0470-6fb3-45dc-b8a9-e1828750e92d
Processing file: 423814b2-265b-4009-afdf-0b325b31caa0
Processing file: 4a389804-517e-40f9-81a5-f9780554a06d
Processing file: 53a7bdbc-381a-491c-98aa-5028834dce06
Processing file: 59527149-639c-466b-bc10-aa787693e7b5
Processing file: 904b3ecf-6b7b-4902-90d8-bfc4a897a55b
Processing file: 970b76be-4c02-4d8f-bc4d-7029c333d018
Processing file: 9f3776bf-cf16-486e-b0af-dd771c566f29
Processing file: b92e7567-5745-4d19-b590-1a091dc536b3
Processing file: ba367c79-7a5a-482f-a0dd-572b36b27b2f
Processing file: eca740eb-7fc6-4b0e-87f8-f48fb6017831
Processing file: eeba93c4-6d

In [8]:
f"Shape of the PySpark dataframe before: {shape(merged_file)}"

'Shape of the PySpark dataframe before: (11626, 7)'

In [9]:
filtered_df = merged_file.dropDuplicates()
f"Shape of the PySpark dataframe after: {shape(filtered_df)}"

'Shape of the PySpark dataframe after: (1023, 7)'

In [10]:
filtered_df is merged_file #this shows that dropDuplicates() is indeed a transformation which results into a new DataFrame

False

# Transformation

In [11]:
def spark_convert_columns(data_df: pyspark.sql.DataFrame,
                          fill_anchor_column: str,*,
                          columns: tuple):
    """
        Helper function to fetch average of grouped dataframe in the PySpark DataFrame.
        In a two stage process, the function first converts the blank/negative values to `None` and then replaces the `None` values with the average values

        @params
        -------
        positional
            -> data_df: Input DataFrame
            -> fill_anchor_column: Anchor column to group the PySpark DataFrame by
        keyword
            -> columns: Columns to fill the average values with

    """
    #A thing to note: the data_df returned isn't the same as data_df
    #Spark DFs are immutable, so when the operations are performed to the DF, and when the variable assigned,
    #The local variable data_df just shifted its reference from the original DF, to the modified DF, preserving all the data in the original DF

    w = Window().partitionBy(fill_anchor_column)
    for column in columns:
        data_df = data_df.withColumn(column, \
                                     F.when(F.col(column)<0,None) \
                                      .otherwise(F.col(column)))
        data_df = data_df.withColumn(column, \
                                     F.when(F.col(column).isNull(), \
                                            F.avg(F.col(column)) \
                                             .over(w)) \
                                      .otherwise(F.col(column)))
        data_df = data_df.withColumn(column,F.col(column).cast("int"))
    return data_df


def spark_calc_percentages(data_df: pyspark.sql.DataFrame,
                           columns: tuple,
                           constants: tuple):

    for column,const in zip(columns,constants):
        data_df = data_df.withColumn(f"{column}_pct_change",F.floor((F.col(column)/const)*100))
    return data_df


def process_filtered_df(filtered_df: pyspark.sql.DataFrame, \
                        values_to_replace: tuple, \
                        target_values: tuple, \
                        cols_to_convert: tuple, \
                        cols_to_filterout: tuple, \
                        col_vals_to_filterout: tuple):
    """
        Function to process 
    """

    if len(values_to_replace) != len(target_values):
        raise ValueError(f"The length of the values to be replaced and the target values dont match: {len(values_to_replace)} and {len(target_values)}.")

    if len(cols_to_filterout) != len(col_vals_to_filterout):
        raise ValueError(f"The length of column names to filter out doesn't match the length of column values: {len(cols_to_filterout)} and {len(col_vals_to_filterout)}.")

    if "year" not in filtered_df.columns:
        raise ValueError(f"The column `year` is not present in the input dataframe.")


    filtered_df = filtered_df.replace(values_to_replace,target_values)
    for column,col_vals in zip(cols_to_filterout,col_vals_to_filterout):
        filtered_df = filtered_df[~getattr(filtered_df,column).isin(list(col_vals))]
    filtered_df = filtered_df.withColumn("year",F.col("year").cast("int"))
    filtered_df = spark_convert_columns(filtered_df,"year",columns=cols_to_convert)
    return filtered_df

In [12]:
values_to_replace = (".","M","F","Accidents Except Drug Posioning (V01-X39, X43, X45-X59, Y85-Y86)","Black Non-Hispanic","White Non-Hispanic")
target_values = (None,"Male","Female","Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86)","Non-Hispanic Black","Non-Hispanic White")
cols_to_filterout = ("race_ethnicity",)
col_vals_to_filterout = (('Not Stated/Unknown','Other Race/Ethnicity'),)
cols_to_convert = ("deaths","death_rate","age_adjusted_death_rate")

# filtered_df = filtered_df.replace(values_to_replace,target_values)
processed_df = process_filtered_df(filtered_df, \
                                   values_to_replace, \
                                   target_values, \
                                   cols_to_convert, \
                                   cols_to_filterout, \
                                   col_vals_to_filterout)
processed_df.cache()
processed_df.show(10)

+----+--------------------+------+--------------------+------+----------+-----------------------+
|year|       leading_cause|   sex|      race_ethnicity|deaths|death_rate|age_adjusted_death_rate|
+----+--------------------+------+--------------------+------+----------+-----------------------+
|2015|Alzheimer's Disea...|Female|  Non-Hispanic Black|   147|        14|                     10|
|2015|Essential Hyperte...|Female|Other Race/ Ethni...|     4|        53|                     49|
|2015|    All Other Causes|Female|            Hispanic|  1184|        92|                     97|
|2015|Mental and Behavi...|Female|            Hispanic|    78|         6|                      5|
|2015|Diabetes Mellitus...|  Male|Other Race/ Ethni...|     3|        53|                     49|
|2015|Chronic Lower Res...|  Male|  Non-Hispanic White|   364|        27|                     21|
|2015|Alzheimer's Disea...|Female|Asian and Pacific...|    50|         7|                      8|
|2015|Diseases of He

# Analysis

In [13]:
sex_analysis_pivot = processed_df \
                    .groupBy(["year"]) \
                    .pivot("sex") \
                    .sum("deaths") \
                    .orderBy("year")
sex_analysis_pivot.cache()


_,f_zero,m_zero,*_ = sex_analysis_pivot.first()
sex_analysis_pivot = spark_calc_percentages(sex_analysis_pivot,["Female","Male"],[f_zero,m_zero])
sex_analysis_pivot.show()

+----+------+-----+-----------------+---------------+
|year|Female| Male|Female_pct_change|Male_pct_change|
+----+------+-----+-----------------+---------------+
|2010| 17530| 7865|              100|            100|
|2011| 29544|27026|              168|            343|
|2012| 30130|27392|              171|            348|
|2013| 28265|27210|              161|            345|
|2014| 27626|25710|              157|            326|
|2015| 27076|26067|              154|            331|
|2016| 26769|26468|              152|            336|
|2017| 26765|26553|              152|            337|
+----+------+-----+-----------------+---------------+



- The above table suggests that for females, the total death counts shows a incline of 52% from 17530 to 26765, while for males the average death counts have more than tripled (surge of 237%), from 7865 to 26553. 
- This shows that the matter of concern is much higher in males as compared to females.

In [14]:
#Fetching the leading cause with the highest deaths year on year for both sexes

w = Window.partitionBy(["year","sex"])
processed_df.withColumn("maxdeaths", \
                        F.max("deaths").over(w)) \
                        .where(F.col('deaths')==F.col('maxdeaths')) \
                        .drop('maxdeaths') \
                        .sort(F.col("year").asc()) \
                        .show()

+----+--------------------+------+------------------+------+----------+-----------------------+
|year|       leading_cause|   sex|    race_ethnicity|deaths|death_rate|age_adjusted_death_rate|
+----+--------------------+------+------------------+------+----------+-----------------------+
|2010|Diseases of Heart...|Female|Non-Hispanic White|  5351|       374|                    189|
|2010|Diseases of Heart...|  Male|          Hispanic|  1354|       118|                    215|
|2011|Diseases of Heart...|Female|Non-Hispanic White|  5016|       354|                    179|
|2011|Diseases of Heart...|  Male|Non-Hispanic White|  4220|       316|                    260|
|2012|Diseases of Heart...|Female|Non-Hispanic White|  4719|       332|                    167|
|2012|Diseases of Heart...|  Male|Non-Hispanic White|  4156|       310|                    252|
|2013|Diseases of Heart...|Female|Non-Hispanic White|  4535|       319|                    160|
|2013|Diseases of Heart...|  Male|Non-Hi

Well this shows that the Heart Disease is the leading cause of death across the board. But it might be the case of an imbalanced dataset. So let's check that.

In [15]:
processed_df.groupBy("leading_cause") \
            .agg(F.sum(F.col("deaths")) \
                  .alias("deaths")) \
            .sort(F.col('deaths').desc()) \
            .show()

+--------------------+------+
|       leading_cause|deaths|
+--------------------+------+
|Diseases of Heart...|127387|
|Malignant Neoplas...| 97632|
|    All Other Causes| 77974|
|Influenza (Flu) a...| 17435|
|Cerebrovascular D...| 14032|
|Chronic Lower Res...| 13970|
|Diabetes Mellitus...| 13158|
|Essential Hyperte...|  8680|
|Accidents Except ...|  7594|
|Mental and Behavi...|  7123|
|Alzheimer's Disea...|  6113|
|Intentional Self-...|  3061|
|Assault (Homicide...|  2899|
|Human Immunodefic...|  2881|
|Chronic Liver Dis...|  1653|
|Certain Condition...|  1574|
|Septicemia (A40-A41)|  1033|
|Congenital Malfor...|  1004|
|Nephritis, Nephro...|   973|
|Aortic Aneurysm a...|   515|
+--------------------+------+
only showing top 20 rows



This shows that # of deaths by heart diseases is significantly larger than the others, causing an imbalance in the dataset. So lets try to create a pivot using the leading causes which will help us understand the changes in the # deaths in a year-on-year basis

### Leading cause analysis

In [16]:
leading_cause_pivot = processed_df \
                      .groupBy("year") \
                      .pivot("leading_cause") \
                      .sum("deaths") \
                      .orderBy("year") \
                      .na.fill(0)

In [17]:
def format_data_cols(data_df: pyspark.sql.DataFrame):
    """
        Helper function to format columns of PySpark DataFrame
    """
    previous_cols = data_df.columns
    formatted_cols = list(map(lambda item: item.replace(":","").replace(".",""), \
                              data_df.columns))
    return reduce(lambda data_df, idx: data_df.withColumnRenamed(previous_cols[idx],formatted_cols[idx]), \
                  range(len(previous_cols)), \
                  data_df)


def spark_calc_pct_change(data_df: pyspark.sql.DataFrame,orderby_column: str,columns: tuple=None):
    """
        Helper function to calculate the % change from previous values for each column in columns
    """
    w = Window.partitionBy().orderBy(orderby_column)    
    if not columns:
        columns = data_df.columns
        columns.remove(orderby_column)
    for column in columns:
        data_df = data_df.withColumn("prev_value", \
                                     F.lag(F.col(column)) \
                                      .over(w))
        data_df = data_df.withColumn(f"{column}_pct_change", \
                                     F.when((F.col("prev_value")==0)|(F.isnull(F.col("prev_value")))|(F.isnull(F.col(column)-F.col("prev_value"))),0) \
                                      .otherwise(F.round((F.col(column)-F.col("prev_value"))/F.col("prev_value")*100,2))) \
                         .drop("prev_value")
    
    return data_df

In [18]:
leading_cause_pivot = format_data_cols(leading_cause_pivot)
leading_cause_pivot_pct_change = spark_calc_pct_change(leading_cause_pivot,"year")
leading_cause_pivot_pct_change.cache()

DataFrame[year: int, Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86): bigint, All Other Causes: bigint, Alzheimer's Disease (G30): bigint, Anemias (D50-D64): bigint, Aortic Aneurysm and Dissection (I71): bigint, Assault (Homicide U01-U02, Y871, X85-Y09): bigint, Assault (Homicide Y871, X85-Y09): bigint, Atherosclerosis (I70): bigint, Cerebrovascular Disease (Stroke I60-I69): bigint, Certain Conditions originating in the Perinatal Period (P00-P96): bigint, Chronic Liver Disease and Cirrhosis (K70, K73): bigint, Chronic Liver Disease and Cirrhosis (K70, K73-K74): bigint, Chronic Lower Respiratory Diseases (J40-J47): bigint, Congenital Malformations, Deformations, and Chromosomal Abnormalities (Q00-Q99): bigint, Diabetes Mellitus (E10-E14): bigint, Diseases of Heart (I00-I09, I11, I13, I20-I51): bigint, Essential Hypertension and Renal Diseases (I10, I12): bigint, Human Immunodeficiency Virus Disease (HIV B20-B24): bigint, Influenza (Flu) and Pneumonia (J09-J18): bigint, 

In [19]:
def find_max_cols(data_df: pyspark.sql.DataFrame):
    """
        Helper function to find the column name containing the max value in each row.
    """

    change_cols = [i for i in data_df.columns if i.endswith('_pct_change')]
    cond = "F.when" + ".when".join(["(F.col(\"" + c + "\") == F.col(\"max_value\"), F.lit(\"" + c + "\"))" for c in change_cols])
    return data_df.withColumn("max_value",F.greatest(*change_cols)) \
                                           .withColumn("MAX",eval(cond)) \
                                           .select(["max_value","MAX"])


def find_min_cols(data_df: pyspark.sql.DataFrame):
    """
        Helper function to find the column name containing the min value in each row.
    """
    change_cols = [i for i in data_df.columns if i.endswith('_pct_change')]
    cond = "F.when" + ".when".join(["(F.col(\"" + c + "\") == F.col(\"min_value\"), F.lit(\"" + c + "\"))" for c in change_cols])
    return data_df.withColumn("min_value",F.least(*change_cols)) \
                                           .withColumn("MIN",eval(cond)) \
                                           .select(["min_value","MIN"])

In [20]:
find_max_cols(leading_cause_pivot_pct_change).collect()

[Row(max_value=0.0, MAX='Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86)_pct_change'),
 Row(max_value=2861.11, MAX='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(max_value=61.67, MAX='Assault (Homicide Y871, X85-Y09)_pct_change'),
 Row(max_value=504.08, MAX='Nephritis, Nephrotic Syndrome and Nephrisis (N00-N07, N17-N19, N25-N27)_pct_change'),
 Row(max_value=41.83, MAX='Essential Hypertension and Renal Diseases (I10, I12)_pct_change'),
 Row(max_value=62.61, MAX="Alzheimer's Disease (G30)_pct_change"),
 Row(max_value=200.0, MAX='Mental and Behavioral Disorders due to Use of Alcohol (F10)_pct_change'),
 Row(max_value=200.0, MAX='Septicemia (A40-A41)_pct_change')]

In [21]:
find_min_cols(leading_cause_pivot_pct_change).collect()

[Row(min_value=0.0, MIN='Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86)_pct_change'),
 Row(min_value=0.0, MIN='Anemias (D50-D64)_pct_change'),
 Row(min_value=-41.44, MIN='Chronic Lower Respiratory Diseases (J40-J47)_pct_change'),
 Row(min_value=-100.0, MIN='Aortic Aneurysm and Dissection (I71)_pct_change'),
 Row(min_value=-100.0, MIN='Viral Hepatitis (B15-B19)_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(min_value=-100.0, MIN='Assault (Homicide Y871, X85-Y09)_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change')]

In [22]:
max_cols = list(set(row.MAX.replace("_pct_change","") for row in find_max_cols(leading_cause_pivot_pct_change).collect()))
min_cols = list(set(row.MIN.replace("_pct_change","") for row in find_min_cols(leading_cause_pivot_pct_change).collect()))

leading_cause_pivot_pct_change_data = leading_cause_pivot_pct_change.select(list(max_cols))
leading_cause_pivot_pct_change_data.toPandas()

Unnamed: 0,"Nephritis, Nephrotic Syndrome and Nephrisis (N00-N07, N17-N19, N25-N27)","Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86)",Septicemia (A40-A41),"Essential Hypertension and Renal Diseases (I10, I12)",Mental and Behavioral Disorders due to Use of Alcohol (F10),Alzheimer's Disease (G30),"Assault (Homicide Y871, X85-Y09)",Certain Conditions originating in the Perinatal Period (P00-P96)
0,0,423,0,216,0,71,299,18
1,83,1264,0,1400,0,949,780,533
2,98,961,632,1401,0,976,1261,513
3,592,1447,151,949,0,988,174,499
4,118,943,118,1346,0,567,186,8
5,81,821,128,1084,1,922,199,0
6,1,836,1,1095,3,814,0,3
7,0,899,3,1189,0,826,0,0


In [23]:
leading_cause_pivot_pct_change_data = leading_cause_pivot_pct_change.select(list(min_cols))
leading_cause_pivot_pct_change_data.toPandas()

Unnamed: 0,Anemias (D50-D64),Chronic Lower Respiratory Diseases (J40-J47),Aortic Aneurysm and Dissection (I71),"Assault (Homicide Y871, X85-Y09)",Viral Hepatitis (B15-B19),Certain Conditions originating in the Perinatal Period (P00-P96),"Accidents Except Drug Poisoning (V01-X39, X43, X45-X59, Y85-Y86)"
0,0,362,0,299,0,18,423
1,0,2785,0,780,0,533,1264
2,0,1631,513,1261,0,513,961
3,0,2308,0,174,15,499,1447
4,0,1795,0,186,0,8,943
5,0,1725,0,199,2,0,821
6,0,1634,0,0,0,3,836
7,1,1730,2,0,0,0,899


- After viewing the data, it seems that the statistics about diseases like mental and behavioral disorders due to alcohol abuse, Anemias, Viral Hepatitis etc dont have any significance associated with them due to lower case count
- The highest surge in deaths has occured due to conditions originating in the perinatal period. And the trend continues for a couple of years, and plummets to just 8 deaths from 499 in 2014.
- It seems like kidney related diseases have been prevelant throughout. Hypertension and renal diseases death toll surged in 2011, and have been stable around the 1000-1500 mark. Deaths due to Nephritis and Nephrisis increased in 2013 and 2014 and have declined since then.
- The law & order seems to be stable in the city. The deaths due to homicide surged in the years 2011 and 2012 but since then have seen a steep decline. However, there aren't any deaths recorded for the years 2016-17, which might be due to lack of data and a point of concern.


### Leading cause analysis by gender

Let's analyze this data from the perspective of sexes, and try to find any discernable patterns

In [24]:
leading_cause_female_pivot = processed_df \
                             .groupBy("year","sex") \
                             .pivot("leading_cause") \
                             .sum("deaths") \
                             .orderBy("year") \
                             .where(F.col("sex")=="Female") \
                             .na.fill(0)

leading_cause_male_pivot = processed_df \
                             .groupBy("year","sex") \
                             .pivot("leading_cause") \
                             .sum("deaths") \
                             .orderBy("year") \
                             .where(F.col("sex")=="Male") \
                             .na.fill(0)

leading_cause_female_pivot = format_data_cols(leading_cause_female_pivot)
leading_cause_male_pivot = format_data_cols(leading_cause_male_pivot)

leading_cause_female_pivot_pct_change = spark_calc_pct_change(leading_cause_female_pivot,"year")
leading_cause_male_pivot_pct_change = spark_calc_pct_change(leading_cause_male_pivot,"year")

### Female

In [25]:
find_max_cols(leading_cause_female_pivot_pct_change).collect()

[Row(max_value=0.0, MAX='sex_pct_change'),
 Row(max_value=3133.33, MAX='Essential Hypertension and Renal Diseases (I10, I12)_pct_change'),
 Row(max_value=256.25, MAX='Mental and Behavioral Disorders due to Accidental Poisoning and Other Psychoactive Substance Use (F11-F16, F18-F19, X40-X42, X44)_pct_change'),
 Row(max_value=504.08, MAX='Nephritis, Nephrotic Syndrome and Nephrisis (N00-N07, N17-N19, N25-N27)_pct_change'),
 Row(max_value=560.0, MAX='Intentional Self-Harm (Suicide X60-X84, Y870)_pct_change'),
 Row(max_value=32.63, MAX="Alzheimer's Disease (G30)_pct_change"),
 Row(max_value=316.46, MAX='Mental and Behavioral Disorders due to Accidental Poisoning and Other Psychoactive Substance Use (F11-F16, F18-F19, X40-X42, X44)_pct_change'),
 Row(max_value=100.0, MAX='Congenital Malformations, Deformations, and Chromosomal Abnormalities (Q00-Q99)_pct_change')]

In [26]:
max_cols = list(set(row.MAX.replace("_pct_change","") for row in find_max_cols(leading_cause_female_pivot_pct_change).collect()))
leading_cause_female_pivot_pct_change_data = leading_cause_female_pivot_pct_change.select(list(max_cols))
leading_cause_female_pivot_pct_change_data.toPandas()

Unnamed: 0,"Nephritis, Nephrotic Syndrome and Nephrisis (N00-N07, N17-N19, N25-N27)","Essential Hypertension and Renal Diseases (I10, I12)",sex,"Congenital Malformations, Deformations, and Chromosomal Abnormalities (Q00-Q99)",Alzheimer's Disease (G30),"Intentional Self-Harm (Suicide X60-X84, Y870)","Mental and Behavioral Disorders due to Accidental Poisoning and Other Psychoactive Substance Use (F11-F16, F18-F19, X40-X42, X44)"
0,0,33,Female,0,71,0,529
1,83,1067,Female,0,949,535,144
2,98,1066,Female,513,976,33,513
3,592,607,Female,0,489,5,63
4,90,1016,Female,484,567,33,0
5,80,590,Female,3,752,36,79
6,1,635,Female,1,774,0,329
7,0,661,Female,2,795,0,326


In [27]:
find_min_cols(leading_cause_female_pivot_pct_change).collect()

[Row(min_value=0.0, MIN='sex_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(min_value=-93.83, MIN='Intentional Self-Harm (Suicide X60-X84, Y870)_pct_change'),
 Row(min_value=-100.0, MIN='Aortic Aneurysm and Dissection (I71)_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(min_value=-100.0, MIN='Chronic Liver Disease and Cirrhosis (K70, K73)_pct_change'),
 Row(min_value=-100.0, MIN='Atherosclerosis (I70)_pct_change'),
 Row(min_value=-100.0, MIN='Assault (Homicide U01-U02, Y871, X85-Y09)_pct_change')]

In [28]:
min_cols = list(set(row.MIN.replace("_pct_change","") for row in find_min_cols(leading_cause_female_pivot_pct_change).collect()))
leading_cause_female_pivot_pct_change_data = leading_cause_female_pivot_pct_change.select(list(min_cols))
leading_cause_female_pivot_pct_change_data.toPandas()

Unnamed: 0,sex,Atherosclerosis (I70),Aortic Aneurysm and Dissection (I71),"Chronic Liver Disease and Cirrhosis (K70, K73)","Intentional Self-Harm (Suicide X60-X84, Y870)","Assault (Homicide U01-U02, Y871, X85-Y09)",Certain Conditions originating in the Perinatal Period (P00-P96)
0,Female,0,0,0,0,0,18
1,Female,0,0,0,535,0,0
2,Female,0,513,54,33,0,0
3,Female,0,0,0,5,0,499
4,Female,0,0,66,33,0,0
5,Female,1,0,0,36,0,0
6,Female,0,0,0,0,1,2
7,Female,0,0,0,0,0,0


### Male

In [29]:
find_max_cols(leading_cause_male_pivot_pct_change).collect()

[Row(max_value=0.0, MAX='sex_pct_change'),
 Row(max_value=504.62, MAX='Malignant Neoplasms (Cancer C00-C97)_pct_change'),
 Row(max_value=225.54, MAX='Intentional Self-Harm (Suicide X60-X84, Y870)_pct_change'),
 Row(max_value=315.34, MAX='Chronic Liver Disease and Cirrhosis (K70, K73)_pct_change'),
 Row(max_value=9.07, MAX='Mental and Behavioral Disorders due to Accidental Poisoning and Other Psychoactive Substance Use (F11-F16, F18-F19, X40-X42, X44)_pct_change'),
 Row(max_value=49.7, MAX='Essential Hypertension and Renal Diseases (I10, I12)_pct_change'),
 Row(max_value=100.0, MAX='Mental and Behavioral Disorders due to Use of Alcohol (F10)_pct_change'),
 Row(max_value=14.78, MAX='Essential Hypertension and Renal Diseases (I10, I12)_pct_change')]

In [30]:
max_cols = list(set(row.MAX.replace("_pct_change","") for row in find_max_cols(leading_cause_male_pivot_pct_change).collect()))
leading_cause_male_pivot_pct_change_data = leading_cause_male_pivot_pct_change.select(list(max_cols))
leading_cause_male_pivot_pct_change_data.toPandas()

Unnamed: 0,Malignant Neoplasms (Cancer C00-C97),"Essential Hypertension and Renal Diseases (I10, I12)",sex,Mental and Behavioral Disorders due to Use of Alcohol (F10),"Chronic Liver Disease and Cirrhosis (K70, K73)","Intentional Self-Harm (Suicide X60-X84, Y870)","Mental and Behavioral Disorders due to Accidental Poisoning and Other Psychoactive Substance Use (F11-F16, F18-F19, X40-X42, X44)"
0,1082,183,Male,0,150,711,138
1,6542,333,Male,0,172,231,534
2,6529,335,Male,0,163,752,445
3,6535,342,Male,0,677,251,474
4,6527,330,Male,0,205,254,517
5,6401,494,Male,1,166,220,761
6,6619,460,Male,2,0,0,1112
7,6502,528,Male,0,0,0,1159


In [31]:
find_min_cols(leading_cause_male_pivot_pct_change).collect()

[Row(min_value=0.0, MIN='sex_pct_change'),
 Row(min_value=-67.51, MIN='Intentional Self-Harm (Suicide X60-X84, Y870)_pct_change'),
 Row(min_value=-46.6, MIN='Chronic Lower Respiratory Diseases (J40-J47)_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(min_value=-100.0, MIN="Alzheimer's Disease (G30)_pct_change"),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change'),
 Row(min_value=-100.0, MIN='Assault (Homicide Y871, X85-Y09)_pct_change'),
 Row(min_value=-100.0, MIN='Certain Conditions originating in the Perinatal Period (P00-P96)_pct_change')]

In [32]:
min_cols = list(set(row.MIN.replace("_pct_change","") for row in find_min_cols(leading_cause_male_pivot_pct_change).collect()))
leading_cause_male_pivot_pct_change_data = leading_cause_male_pivot_pct_change.select(list(min_cols))
leading_cause_male_pivot_pct_change_data.toPandas()

Unnamed: 0,sex,Chronic Lower Respiratory Diseases (J40-J47),Alzheimer's Disease (G30),"Assault (Homicide Y871, X85-Y09)","Intentional Self-Harm (Suicide X60-X84, Y870)",Certain Conditions originating in the Perinatal Period (P00-P96)
0,Male,0,0,299,711,0
1,Male,1354,0,780,231,533
2,Male,723,0,748,752,513
3,Male,1334,499,174,251,0
4,Male,825,0,186,254,8
5,Male,774,170,199,220,0
6,Male,737,40,0,0,1
7,Male,755,31,0,0,0


- Males are more prone to self-harm than females. The death toll rose amongst females for 2011, but since then the numbers have been mitigated since then. The deaths due to self-harm are also decreasing amongst males, which indicates a good mental health amongst the people of the city.
- Males are prone to assaults/homicide compared to females. There is only 1 female death reported in 7 years. As compared, male death tools are in the hundreds.
- It seems like Alzheimer's is a rampant issue amongst females. With the exception of 2013, the death toll has been increasing year-on-year basis. On the other hand, males seem to be suffering from cancer(Malignant Neoplasms). The number has increased six folds from 2010 to 2011 and averages around 6500 every year.

# Saving data to hdfs

In [33]:
def save_dataframes_to_hdfs(path,config,data_dfs,target_file_names):
    """
        Function to store dataframe in hdfs
        
        Input:
        
        path: the directory path to store dataframe to
        config: Config object
        data_dfs: list of PySpark DataFrames to write
        target_file_names: list of file names to store dataframes by        
    """

    for data_df,target_file_name in zip(data_dfs,target_file_names):
        print(f"Processing file: {target_file_name}")
        data_df.write.format("csv").mode("overwrite").save(f"{config.hdfs_datanode_host}/{path}/{target_file_name}")

In [34]:
data_dfs = (sex_analysis_pivot,leading_cause_pivot_pct_change_data,leading_cause_male_pivot_pct_change_data,leading_cause_female_pivot_pct_change_data)

target_file_names = ("sex_analysis_pivot.csv","leading_cause_pivot_pct_change_data.csv","leading_cause_male_pivot_pct_change_data.csv","leading_cause_female_pivot_pct_change_data.csv")

save_dataframes_to_hdfs('csv-data',config,data_dfs,target_file_names)

Processing file: sex_analysis_pivot.csv
Processing file: leading_cause_pivot_pct_change_data.csv
Processing file: leading_cause_male_pivot_pct_change_data.csv
Processing file: leading_cause_female_pivot_pct_change_data.csv


# Loading documents in Elasticsearch cluster

In [38]:
def save_dataframes_to_elasticsearch(dataframes,indices,es_write_config):
    """
       Helper function to save PySpark DataFrames to elasticsearch cluster

       Parameters
       ----------

       dataframes: list of all PySpark DataFrames
       indices: list of elasticsearch indices
       es_write_config: dict of elasticsearch write config
    """

    for dataframe,index in zip(dataframes,indices):
        print(f"Processing index:{index}")

        es_write_config['es.resource'] = index

        rdd_ = dataframe.rdd
        rdd_.map(lambda row: (None, \
                              json.dumps(row.asDict()))) \
            .saveAsNewAPIHadoopFile(path='-', \
                                    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
                                    keyClass="org.apache.hadoop.io.NullWritable", \
                                    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
                                    conf=es_write_config)

In [40]:
data_dfs = (sex_analysis_pivot,leading_cause_pivot_pct_change_data,leading_cause_male_pivot_pct_change_data,leading_cause_female_pivot_pct_change_data)

target_indices = ("sex_analysis","leading_cause_analysis","leading_cause_male_analysis","leading_cause_female_analysis")

es_write_conf = {
        "es.nodes" : "elasticsearch",
        "es.port" : "9200",
        "es.input.json": "yes",
        "es.nodes.wan.only": "True",
        "es.net.http.auth.user": "elastic",
        "es.net.http.auth.pass": "elastic@1234"
    }

save_dataframes_to_elasticsearch(data_dfs,target_indices,es_write_conf)

Processing index:sex_analysis
Processing index:leading_cause_analysis
Processing index:leading_cause_male_analysis
Processing index:leading_cause_female_analysis


### This sums it up! We extracted all the files from hdfs, transformed and analyzed them and finally loaded them in our elasticsearch cluster!