1. Data Ingestion: 
    - Obtain a diverse and large dataset. (Use Public data sets)
    - Upload the dataset to HDFS.


Dataset downloaded from https://www150.statcan.gc.ca/n1/tbl/csv/13100096-eng.zip

In [1]:
!hdfs dfs -put ../datatechonlogysolutions/dataset/13100096.csv /

2024-04-10 12:56:21,944 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#Displaying the size of the file in HDFS
!hdfs dfs -du -h /13100096.csv

2024-04-10 12:56:25,023 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
42.2 M  42.2 M  /13100096.csv


In [38]:
#Read the uploaded file from HDFS
!hdfs dfs -cat /13100096.csv 

2024-04-08 03:46:57,900 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
﻿Year,Geography,Age_Group,Sex,Indicators,Characteristics,Value
2015,Canada (excluding territories),"Total, 12 years and over",Both sexes,"Perceived health, very good or excellent",Number of persons,18759800
2015,Canada (excluding territories),"Total, 12 years and over",Both sexes,"Perceived health, very good or excellent","Low 95% confidence interval, number of persons",18556100
2015,Canada (excluding territories),"Total, 12 years and over",Both sexes,"Perceived health, very good or excellent","High 95% confidence interval, number of persons",18963600
2015,Canada (excluding territories),"Total, 12 years and over",Both sexes,"Perceived health, very good or excellent",Percent,61.9
2015,Canada (excluding territories),"Total, 12 years and over",Both sexes,"Perceived health, very good or excellent","Low 95% confidence interval, percent",61

2. Data Exploration with Hive:
    - Create a Hive table to read the dataset.
    - Explore the structure of the data using Hive queries.
    - Identify any missing or inconsistent data.


In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("CanadianHealthHiveAnalysis").enableHiveSupport().getOrCreate()

# Create the database named healthDb
spark.sql("CREATE DATABASE IF NOT EXISTS healthDb")

# Use the healthDb database

spark.sql("SHOW DATABASES").show()

spark.sql("USE healthdb")

# Drop the table if it exists
spark.sql("DROP TABLE IF EXISTS healthdata")

# spark.sql("DROP TABLE IF EXISTS healthdata")
# Create a table from the uploaded file skip the header
spark.sql("""
CREATE TABLE IF NOT EXISTS healthdata (
    Year INT,
    Geography STRING,
    Age_Group STRING,
    Sex STRING,
    Indicators STRING,
    Characteristics STRING,
    Value BIGINT
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = ",",
   "quoteChar"     = "\\""
);""")

# Load the CSV file data into a Table
spark.sql("LOAD DATA INPATH 'hdfs://localhost:9000/13100096.csv' INTO TABLE healthdata")


# Display the data from Hive table
spark.sql("SELECT * FROM healthdata").show(truncate=False)


24/04/10 11:50:43 WARN Utils: Your hostname, Rajs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.27 instead (on interface en0)
24/04/10 11:50:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/10 11:50:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/10 11:50:46 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/04/10 11:50:46 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/04/10 11:50:47 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/04/10 11:50:47 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by M

+---------+
|namespace|
+---------+
|  default|
| healthdb|
+---------+



24/04/10 11:50:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/04/10 11:50:48 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/04/10 11:50:48 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/04/10 11:50:48 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/04/10 11:50:48 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/04/10 11:50:48 WARN HiveMetaStore: Location: file:/Users/rajprasadshrestha/Documents/groupassignments/datatechonlogysolutions/spark-warehouse/healthdb.db/healthdata specified for non-external table:healthdata
24/04/10 11:50:49 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<year:string,geography:string,age_group:string,sex:string,indicators:string,characteristics:string,value:string>) is different from the sche

+----+------------------------------+------------------------+----------+-----------------------------------------------+-----------------------------------------------+--------+
|year|geography                     |age_group               |sex       |indicators                                     |characteristics                                |value   |
+----+------------------------------+------------------------+----------+-----------------------------------------------+-----------------------------------------------+--------+
|Year|Geography                     |Age_Group               |Sex       |Indicators                                     |Characteristics                                |Value   |
|2015|Canada (excluding territories)|Total, 12 years and over|Both sexes|Perceived health, very good or excellent       |Number of persons                              |18759800|
|2015|Canada (excluding territories)|Total, 12 years and over|Both sexes|Perceived health, very good or e

3. Data Preprocessing and Cleaning:
    - Use Hive to clean and preprocess the data 
    - Handle missing values, outliers or any data equality issues

In [4]:
#Data Cleaning using HIVE and pyspark scripts

print("Number of rows in the table")
spark.sql("SELECT COUNT(*) AS NO_OF_ROWS FROM healthdata").show()

# Create a DataFrame that excludes the rows you want to delete (Outliers or data equiality issues)
df_new = spark.sql("""SELECT * FROM healthdata
                    WHERE Sex != 'Both sexes' 
                    AND Geography != 'Canada (excluding territories)'
                    AND Age_Group != 'Total, 12 years and over' 
                    AND Age_Group != 'Total, 18 years and over'""")

# Write the new DataFrame to a temporary table
df_new.write.mode("overwrite") \
    .saveAsTable("healthdata_temp")

# Drop the original table in HIVE
spark.sql("DROP TABLE healthdata")

# Write the temporary table to the original table's location, overwriting it
spark.table("healthdata_temp") \
    .write.mode('overwrite') \
    .saveAsTable("healthdata")

# Drop the temporary table
spark.sql("DROP TABLE healthdata_temp")


# spark.sql("SELECT * FROM healthdata") \
#     .show(truncate=False, n=5)

print("Number of rows in the table after removing some records (contaning outliers and data equality ): ")
spark.sql("SELECT COUNT(*) AS NO_OF_ROWS FROM healthdata").show()

#Check null values of each column in HIVE table
print("Number of null values  in the table of each column is : ")
spark.sql(""" SELECT COUNT(*) AS NULL_VALUE_COUNT FROM healthdata
              WHERE Year IS NULL OR Geography IS NULL OR Age_Group IS NULL 
              OR  Sex is NULL OR Indicators IS NULL OR Characteristics IS NULL OR Value IS NULL""").show()

spark.sql("DESCRIBE healthdata").show()

print("Before removing the first row....")
spark.sql("SELECT * FROM healthdata") \
    .show(truncate=False, n=5)


# Filter out the first row
df_new = spark.sql("SELECT * FROM healthdata WHERE Year != 'Year'")

# Write the new DataFrame to a different table
df_new.write.mode("overwrite") \
    .saveAsTable("healthdata_temp")

# Drop the original table
spark.sql("DROP TABLE healthdata")

# Rename the new table to the original table's name
spark.sql("ALTER TABLE healthdata_temp RENAME TO healthdata")

# Display the data from the table after removing the first row
print("After removing the first row....")
spark.sql("SELECT * FROM healthdata") \
    .show(truncate=False, n=5)

Number of rows in the table


24/04/10 11:50:56 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<year:string,geography:string,age_group:string,sex:string,indicators:string,characteristics:string,value:string>) is different from the schema when this table was created by Spark SQL(struct<Year:int,Geography:string,Age_Group:string,Sex:string,Indicators:string,Characteristics:string,Value:bigint>). We have to fall back to the table schema from Hive metastore which is not case preserving.
24/04/10 11:50:57 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<year:string,geography:string,age_group:string,sex:string,indicators:string,characteristics:string,value:string>) is different from the schema when this table was created by Spark SQL(struct<Year:int,Geography:string,Age_Group:string,Sex:string,Indicators:string,Characteristics:string,Value:bigint>). We have to fall back to the table schema from Hive metastore which is not case preserving.


+----------+
|NO_OF_ROWS|
+----------+
|    327166|
+----------+



24/04/10 11:50:59 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<year:string,geography:string,age_group:string,sex:string,indicators:string,characteristics:string,value:string>) is different from the schema when this table was created by Spark SQL(struct<Year:int,Geography:string,Age_Group:string,Sex:string,Indicators:string,Characteristics:string,Value:bigint>). We have to fall back to the table schema from Hive metastore which is not case preserving.
24/04/10 11:50:59 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<year:string,geography:string,age_group:string,sex:string,indicators:string,characteristics:string,value:string>) is different from the schema when this table was created by Spark SQL(struct<Year:int,Geography:string,Age_Group:string,Sex:string,Indicators:string,Characteristics:string,Value:bigint>). We have to fall back to the table schema from Hive metastore which is not case preserving.


Number of rows in the table after removing some records (contaning outliers and data equality ): 
+----------+
|NO_OF_ROWS|
+----------+
|    158841|
+----------+

Number of null values  in the table of each column is : 
+----------------+
|NULL_VALUE_COUNT|
+----------------+
|               0|
+----------------+

+---------------+---------+-----------------+
|       col_name|data_type|          comment|
+---------------+---------+-----------------+
|           year|   string|from deserializer|
|      geography|   string|from deserializer|
|      age_group|   string|from deserializer|
|            sex|   string|from deserializer|
|     indicators|   string|from deserializer|
|characteristics|   string|from deserializer|
|          value|   string|from deserializer|
+---------------+---------+-----------------+

Before removing the first row....
+----+-------------------------+--------------+-----+----------------------------------------+-----------------------------------------------+

4. Data Analysis with PySpark: 
    - Utilize PySpark SQL and DataFrame API for analysis. 
    -  Calculate descriptive statistics, aggregations, or any meaningful insights.

In [36]:
from pyspark.sql.functions import col

#Convert the HIVE Healthdata table to a DataFrame for further analysis using pyspark DataFrame API
df = spark.sql("SELECT * FROM healthdata")
# df.show(truncate=False)
# df.printSchema()

# #Descrptive statistics of the Value column
# print("Descriptive statistics of the Value column is...")
# df.describe().show()


#Convert the Value column to an integer type
df = df.withColumn("Value", col("Value").cast("int"))


#Count the number of distinct rows in the characteristics column
print("Number of distinct rows in the Health Indicators(conditions) column is : ")
print(df.select("Indicators").distinct().count())



#Objective: 1. Find the top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional
        #Question No 1: Smokers and canabis no of persons in Canada
list = ["Current smoker, daily or occasional", "Current smoker, daily","Cannabis use, past 12 months","Cannabis frequency of use in the past months, daily or almost daily","Ever used e-cigarette or vaping device","Used e-cigarette or vaping device, past 30 days"]

df_filtered = df.filter(col("Indicators").isin(list))

#Rename the Geography column to Province
df_filtered = df_filtered.withColumnRenamed("Geography", "Province")

#Using pyspark Dataframe API for data analysis  top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional for the year 2022
print("Top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional in the year 2022")
df_top_5_provinces = df_filtered.filter(col("Characteristics") == "Number of persons") \
    .filter(col("Year") == 2022) \
    .groupBy("Province") \
    .agg({"Value": "sum"}) \
    .withColumnRenamed("sum(Value)", "Total Number Of Persons") \
    .sort("Total Number Of Persons", ascending=False) \
    .limit(5) 

df_top_5_provinces.show(truncate=False)

import plotly.express as px

df_pd = df_top_5_provinces.toPandas()
fig = px.bar(df_pd, x='Province', y='Total Number Of Persons', title='Top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional',color='Province')
fig.show()
fig.write_image("top_5_provinces.pdf")


#Question no 2: Number of males and females in each province indicator as Current smoker, daily or occasional for the year 2022
df_total_no_of_males = df_filtered.filter(col("Sex") == "Males")\
        .filter(col("Characteristics") == "Number of persons") \
        .filter(col("Year") == 2022) \
        .groupBy("Province") \
        .agg({"Value": "sum"}) \
        .withColumnRenamed("sum(Value)", "Total Number Of Males") \
        .sort("Total Number of Males", ascending=False) \
        .limit(5) 
        

df_total_no_of_females = df_filtered.filter(col("Sex") == "Females")\
        .filter(col("Characteristics") == "Number of persons") \
        .filter(col("Year") == 2022) \
        .groupBy("Province") \
        .agg({"Value": "sum"}) \
        .withColumnRenamed("sum(Value)", "Total Number Of Females") \
        .sort("Total Number of Females", ascending=False) \
        .limit(5) 
        

#Merge the df_total_no_of_males and df_total_no_of_females into a single DataFrame
df_combined = df_total_no_of_males.join(df_total_no_of_males, "Province", "inner")

# Merge the df_total_no_of_males and df_total_no_of_females into a single DataFrame
df_combined = df_total_no_of_males.join(df_total_no_of_females, "Province", "inner")

# Convert the Spark DataFrame to a Pandas DataFrame
df_pd = df_combined.toPandas()

# Create a grouped bar chart
fig = px.bar(df_pd, x='Province', y=['Total Number Of Males', 'Total Number Of Females'], 
             title='Comparison of Total Number of Males and Females in Each Province',
             labels={'value':'Total Number', 'variable':'Gender', 'Province':'Province'},
             barmode='group')

# Show the chart
fig.show()

fig.write_image("gender.pdf")

#Question No 3: Number of persons according to the age groups in each province indicator as Current smoker, daily or occasional for the year 2022

#Filter the data for the age groups
df_age_groups = df_filtered.filter(col("Characteristics") == "Number of persons") \
    .filter(col("Year") == 2022) \
    .groupBy("Province", "Age_Group") \
    .agg({"Value": "sum"}) \
    .withColumnRenamed("sum(Value)", "Total Number Of Persons") \
    .sort("Total Number Of Persons", ascending=False) \
    .limit(5)
       

#Draw the chart for the above query using px.bar
fig = px.pie(df_age_groups, values='Total Number Of Persons', names='Age_Group', title='Number of persons according to the age groups in each province indicator as Current smoker, daily or occasional')
fig.show()
fig.write_image("age_groups.pdf")


#Question No 4: Line plot year wise top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional
df_top_5_provinces = df_filtered.filter(col("Characteristics") == "Number of persons") \
    .groupBy("Year", "Province") \
    .agg({"Value": "sum"}) \
    .withColumnRenamed("sum(Value)", "Total Number Of Persons") \
    .sort("Total Number Of Persons", ascending=False) 

#Draw the chart for the above query using px.line
df_pd = df_top_5_provinces.toPandas().sort_values('Year')

fig = px.line(df_pd, x='Year', y='Total Number Of Persons', 
              title='Year wise top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional', 
              color='Province')


fig.update_layout(
    autosize=False,
    width=2000,
    height=800,
    title={'x':0.5, 'xanchor': 'center', 'font': {'size': 24}},
    xaxis={'title': 'Year', 'titlefont': {'size': 18}, 'tickfont': {'size': 14}},
    yaxis={'title': 'Total Number Of Persons', 'titlefont': {'size': 18}, 'tickfont': {'size': 14}},
)

fig.write_image("top_5_provinces_year_wise.pdf")
fig.show()





Number of distinct rows in the Health Indicators(conditions) column is : 
32
Top 5 provinces with the highest number of persons for indicator as Current smoker, daily or occasional in the year 2022
+----------------+-----------------------+
|Province        |Total Number Of Persons|
+----------------+-----------------------+
|Ontario         |8811300                |
|Quebec          |4609500                |
|British Columbia|2760700                |
|Alberta         |2595700                |
|Manitoba        |450100                 |
+----------------+-----------------------+















5. Machine learning Exploration
    - Explore machine learning tasks using PySpark's MLib.
    - Experiment with classification,regression or clustering based on the data characteristics.

In [35]:
#draw the scatter plot for the number of persons for indicator as Current smoker, daily or occasional as per year in Ontario province
df_ontario = df_filtered.filter(col("Province") == "Ontario") \
        .filter(col("Characteristics") == "Number of persons") \
        .groupBy("Year") \
        .agg({"Value": "sum"}) \
        .withColumnRenamed("sum(Value)", "Total Number Of Persons") \
        .sort("Year", ascending=True)

#Convert the Year column to a string type
df_ontario = df_ontario.withColumn("Year", col("Year").cast("string"))

#Draw the chart for the above query using px.scatter
df_pd = df_ontario.toPandas()

fig = px.scatter(df_pd, x='Year', y='Total Number Of Persons',
                 title='Number of persons for indicator as Current smoker, daily or occasional in Ontario province',
                 labels={'Total Number Of Persons': 'Total Number Of Persons', 'Year': 'Year'})
fig.show()


!pip3 install statsmodels


#Predict the number of persons for indicator as Current smoker, daily or occasional in Ontario province for the next 5 years using linear regression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

#Convert the Year column to a numeric type
df_ontario = df_ontario.withColumn("Year", col("Year").cast("int"))

#Create a VectorAssembler
assembler = VectorAssembler(inputCols=["Year"], outputCol="features")

#Create a LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="Total Number Of Persons")

#Create a Pipeline
pipeline = Pipeline(stages=[assembler, lr])

#Fit the model
model = pipeline.fit(df_ontario)

#Predict the number of persons for the next 5 years
df_predict = spark.createDataFrame([(2023,), (2024,), (2025,), (2026,), (2027,)], ["Year"])

#Transform the data
df_predict = model.transform(df_predict)


#draw the prediction line and the scatter plot for the number of persons for indicator as Current smoker, daily or occasional as per year in Ontario province

df_pd = df_ontario.toPandas()
df_predict_pd = df_predict.toPandas()


fig = px.scatter(df_pd, x='Year', y='Total Number Of Persons',
                 title='Number of persons for indicator as Current smoker, daily or occasional in Ontario province',
                 labels={'Total Number Of Persons': 'Total Number Of Persons', 'Year': 'Year'})

fig.update_traces(marker=dict(color='blue', size=10))  # Update the marker size and color

fig.add_scatter(x=df_predict_pd['Year'], y=df_predict_pd['prediction'], mode='lines', 
                name='Prediction', line=dict(color='red'),text='Total Number Of Persons')  # Change the color of the prediction line

fig.update_layout(xaxis=dict(dtick=1))  # Show all the years on the x-axis

fig.show()
fig.write_image("ontario_prediction_innext5years.pdf")



from pyspark.sql.functions import col

# Divide the 'prediction' values by 1,000,000 and create a new column 'prediction_in_millions'
df_predict = df_predict.withColumn('prediction_in_millions', col('prediction') / 1000000)

# Select only the 'Year' and 'prediction_in_millions' columns
df_predict = df_predict.select('Year', 'prediction_in_millions')

df_predict.show()





[33mDEPRECATION: Loading egg at /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/jupyter-1.0.0-py3.11.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m


24/04/10 12:47:58 WARN Instrumentation: [6543ec38] regParam is zero, which might cause numerical instability and overfitting.


+----+----------------------+
|Year|prediction_in_millions|
+----+----------------------+
|2023|     8.315342856907606|
|2024|     9.054560713998079|
|2025|     9.793778571088552|
|2026|    10.532996428179025|
|2027|    11.272214285269499|
+----+----------------------+

