## Spark ETL project 
*The project will be done on Adult income dataset*
*link:* https://archive.ics.uci.edu/dataset/2/adult

### 1. Getting data 

In [1]:
# Installing required libraries:
!pip install pyspark psycopg2-binary sqlalchemy pandas matplotlib seaborn



In [2]:
# Importing required libraries
import findspark 
findspark.init()

In [3]:
import pyspark.sql

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType
from sqlalchemy import *
import psycopg2
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from functools import reduce

In [5]:
# Create Spark Session
spark = SparkSession.builder.appName("ETLExample").config("spark.jars", "C:\\Users\\RANIA\\OneDrive\\postgresql-42.7.1.jar").getOrCreate()
delimiter = ","
# Define schema for the dataset 
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", DoubleType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", DoubleType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex",StringType(), True),
    StructField("capital-gain", DoubleType(), True),
    StructField("capital-loss",DoubleType(), True),
    StructField("hours-per-week",DoubleType(), True),
    StructField("native-country",StringType(), True),
    StructField("income", StringType(), True)])

In [6]:
# Load Adult Income dataset with defined schema
df = spark.read.option("delimiter", delimiter).csv("file:///Users/RANIA/OneDrive/Documents/practice_datasets/adult/adult.data", header = False,schema=schema)

# Giving id to dataframe
df_with_id = df.withColumn('Id', (F.monotonically_increasing_id()+1))

# Create temp view
df_with_id.createOrReplaceTempView("adult_income")

In [7]:
df_with_id.show()

+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+---+
|age|        workclass|  fnlwgt|    education|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income| Id|
+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+---+
| 39|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|  1|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|         0.0|

In [8]:
df_with_id.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_with_id.columns]).show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native-country|income| Id|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|  0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+



### 2. Performing some EDA using Spark-SQL

In [9]:
# 1. Percentage of categories in Workclass
workclass_per = spark.sql("""
Select workclass, count(*) as total_count, round(count(*)/(Select count(*) from adult_income)*100,2) as percentage
from adult_income
group by 1
order by 3 desc""")
workclass_per.show(truncate=False)

+-----------------+-----------+----------+
|workclass        |total_count|percentage|
+-----------------+-----------+----------+
| Private         |22696      |69.7      |
| Self-emp-not-inc|2541       |7.8       |
| Local-gov       |2093       |6.43      |
| ?               |1836       |5.64      |
| State-gov       |1298       |3.99      |
| Self-emp-inc    |1116       |3.43      |
| Federal-gov     |960        |2.95      |
| Without-pay     |14         |0.04      |
| Never-worked    |7          |0.02      |
+-----------------+-----------+----------+



In [10]:
# 2. Gender distribution
gender_per = spark.sql("""
SELECT sex, count(*) as count, round(count(*)/(select count(*) from adult_income)*100,2) as percentage
FROM adult_income
GROUP BY 1
ORDER BY 2 DESC""")
gender_per.show()

+-------+-----+----------+
|    sex|count|percentage|
+-------+-----+----------+
|   Male|21790|     66.92|
| Female|10771|     33.08|
+-------+-----+----------+



In [11]:
# 3. Age distribution
age_per = spark.sql("""
SELECT age_range,
       COUNT(age) AS total_count,
       ROUND(COUNT(age) / (SELECT COUNT(age) FROM adult_income) * 100, 2) AS percentage
FROM (
    SELECT *,
           CASE
               WHEN age BETWEEN 1 AND 10 THEN '1-10'
               WHEN age BETWEEN 11 AND 20 THEN '11-20'
               WHEN age BETWEEN 21 AND 30 THEN '21-30'
               WHEN age BETWEEN 31 AND 40 THEN '31-40'
               WHEN age BETWEEN 41 AND 50 THEN '41-50'
               WHEN age BETWEEN 51 AND 60 THEN '51-60'
               WHEN age BETWEEN 61 AND 70 THEN '61-70'
               WHEN age BETWEEN 71 AND 80 THEN '71-80'
               WHEN age BETWEEN 81 AND 90 THEN '81-90'
               WHEN age BETWEEN 91 AND 100 THEN '91-100'
               ELSE 'Unknown'
           END AS age_range
    FROM adult_income
) AS sub_query
GROUP BY 1
ORDER BY 1;
""")
age_per.show()

+---------+-----------+----------+
|age_range|total_count|percentage|
+---------+-----------+----------+
|    11-20|       2410|       7.4|
|    21-30|       8162|     25.07|
|    31-40|       8546|     26.25|
|    41-50|       6983|     21.45|
|    51-60|       4128|     12.68|
|    61-70|       1792|       5.5|
|    71-80|        441|      1.35|
|    81-90|         99|       0.3|
+---------+-----------+----------+



In [12]:
# 4. Occupation distribution
occupation_per = spark.sql("""
Select occupation, count(*) as total, round(count(*)/(select count(*) from adult_income)*100,2) as percentage
From adult_income
Group by 1
order by 2 desc""")
occupation_per.show()

+------------------+-----+----------+
|        occupation|total|percentage|
+------------------+-----+----------+
|    Prof-specialty| 4140|     12.71|
|      Craft-repair| 4099|     12.59|
|   Exec-managerial| 4066|     12.49|
|      Adm-clerical| 3770|     11.58|
|             Sales| 3650|     11.21|
|     Other-service| 3295|     10.12|
| Machine-op-inspct| 2002|      6.15|
|                 ?| 1843|      5.66|
|  Transport-moving| 1597|       4.9|
| Handlers-cleaners| 1370|      4.21|
|   Farming-fishing|  994|      3.05|
|      Tech-support|  928|      2.85|
|   Protective-serv|  649|      1.99|
|   Priv-house-serv|  149|      0.46|
|      Armed-Forces|    9|      0.03|
+------------------+-----+----------+



In [13]:
# 5. Native Country distribution
native_country_per = spark.sql("""
SELECT row_number() over (order by count(*) desc) as id, `native-country`, count(*) as total, round(count(*)/(SELECT count(*) FROM adult_income) * 100,2) as percentage
FROM adult_income
GROUP BY 2
ORDER BY 3 DESC""")
native_country_per.show(native_country_per.count())

+---+--------------------+-----+----------+
| id|      native-country|total|percentage|
+---+--------------------+-----+----------+
|  1|       United-States|29170|     89.59|
|  2|              Mexico|  643|      1.97|
|  3|                   ?|  583|      1.79|
|  4|         Philippines|  198|      0.61|
|  5|             Germany|  137|      0.42|
|  6|              Canada|  121|      0.37|
|  7|         Puerto-Rico|  114|      0.35|
|  8|         El-Salvador|  106|      0.33|
|  9|               India|  100|      0.31|
| 10|                Cuba|   95|      0.29|
| 11|             England|   90|      0.28|
| 12|             Jamaica|   81|      0.25|
| 13|               South|   80|      0.25|
| 14|               China|   75|      0.23|
| 15|               Italy|   73|      0.22|
| 16|  Dominican-Republic|   70|      0.21|
| 17|             Vietnam|   67|      0.21|
| 18|           Guatemala|   64|       0.2|
| 19|               Japan|   62|      0.19|
| 20|              Poland|   60|

In [14]:
# 6. Marital Status and education
marital_education_per = spark.sql("""
select `marital-status`, education, count(*) as total_count, 
round(count(*)/(select count(*) from adult_income),2) as percentage
from adult_income
group by 1,2 
order by 3 desc""")
marital_education_per.show()

+-------------------+-------------+-----------+----------+
|     marital-status|    education|total_count|percentage|
+-------------------+-------------+-----------+----------+
| Married-civ-spouse|      HS-grad|       4845|      0.15|
|      Never-married|      HS-grad|       3089|      0.09|
|      Never-married| Some-college|       2933|      0.09|
| Married-civ-spouse| Some-college|       2818|      0.09|
| Married-civ-spouse|    Bachelors|       2768|      0.09|
|      Never-married|    Bachelors|       1795|      0.06|
|           Divorced|      HS-grad|       1613|      0.05|
|           Divorced| Some-college|       1069|      0.03|
| Married-civ-spouse|      Masters|       1003|      0.03|
| Married-civ-spouse|    Assoc-voc|        689|      0.02|
|      Never-married|         11th|        586|      0.02|
|           Divorced|    Bachelors|        546|      0.02|
| Married-civ-spouse|   Assoc-acdm|        460|      0.01|
|            Widowed|      HS-grad|        414|      0.0

### 3. ETL And Data Cleaning.

In [15]:
# Data cleaning - checking for null or weid values.
df_with_id.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- Id: long (nullable = false)



In [16]:
# Removing all kind of spaces in string column values
# Getting list of str columns
str_columns = [c for c in df_with_id.columns if isinstance(df_with_id.schema[c].dataType, StringType)]
df_with_id = reduce(lambda df, col_name: df.withColumn(col_name, F.trim(F.col(col_name))), str_columns, df_with_id)

In [17]:
# Checking for null rows.
df_with_id_nulls = df_with_id.filter(reduce(lambda x, y: x | y, (F.col(c).isNull() for c in df_with_id.columns)))
df_with_id_nulls.show()
df_with_id_nulls.count()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native-country|income| Id|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+---+



0

In [18]:
# Total df count
df_with_id.count()

32561

**This suggests each column have null values**

In [19]:
df_with_id.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- Id: long (nullable = false)



### 1. Data cleaning  for string data type 

In [20]:
# 1. checking for weird values in each columns -  string and integera
distinct_values = df_with_id.select("workclass").distinct().show()

+----------------+
|       workclass|
+----------------+
|Self-emp-not-inc|
|       Local-gov|
|       State-gov|
|         Private|
|     Without-pay|
|     Federal-gov|
|    Never-worked|
|               ?|
|    Self-emp-inc|
+----------------+



In [21]:
# Let's check the number of times it is used.
substring = '?'
df_weird_value = df_with_id.filter(df_with_id['workclass'].contains(substring))
df_weird_value.show()
df_weird_value.count()

+---+---------+--------+------------+-------------+--------------------+----------+-------------+------------------+------+------------+------------+--------------+--------------+------+---+
|age|workclass|  fnlwgt|   education|education-num|      marital-status|occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income| Id|
+---+---------+--------+------------+-------------+--------------------+----------+-------------+------------------+------+------------+------------+--------------+--------------+------+---+
| 54|        ?|180211.0|Some-college|         10.0|  Married-civ-spouse|         ?|      Husband|Asian-Pac-Islander|  Male|         0.0|         0.0|          60.0|         South|  >50K| 28|
| 32|        ?|293936.0|     7th-8th|          4.0|Married-spouse-ab...|         ?|Not-in-family|             White|  Male|         0.0|         0.0|          40.0|             ?| <=50K| 62|
| 25|        ?|200681.0|Some-college|        

1836

In [22]:
# It seems occupation also gets ?, let see
df_weird_value.select("Id", "workclass", "occupation").show(df_weird_value.count())


+-----+---------+----------+
|   Id|workclass|occupation|
+-----+---------+----------+
|   28|        ?|         ?|
|   62|        ?|         ?|
|   70|        ?|         ?|
|   78|        ?|         ?|
|  107|        ?|         ?|
|  129|        ?|         ?|
|  150|        ?|         ?|
|  155|        ?|         ?|
|  161|        ?|         ?|
|  188|        ?|         ?|
|  202|        ?|         ?|
|  222|        ?|         ?|
|  227|        ?|         ?|
|  244|        ?|         ?|
|  267|        ?|         ?|
|  298|        ?|         ?|
|  313|        ?|         ?|
|  327|        ?|         ?|
|  347|        ?|         ?|
|  348|        ?|         ?|
|  355|        ?|         ?|
|  398|        ?|         ?|
|  409|        ?|         ?|
|  431|        ?|         ?|
|  432|        ?|         ?|
|  450|        ?|         ?|
|  460|        ?|         ?|
|  472|        ?|         ?|
|  485|        ?|         ?|
|  487|        ?|         ?|
|  500|        ?|         ?|
|  512|       

In [23]:
# Lets check count of weird values in occupation 
distinct_val = df_with_id.select("occupation").distinct().show()

+-----------------+
|       occupation|
+-----------------+
|            Sales|
|  Exec-managerial|
|   Prof-specialty|
|Handlers-cleaners|
|  Farming-fishing|
|     Craft-repair|
| Transport-moving|
|  Priv-house-serv|
|  Protective-serv|
|    Other-service|
|     Tech-support|
|Machine-op-inspct|
|     Armed-Forces|
|                ?|
|     Adm-clerical|
+-----------------+



In [24]:
# Let's check the number of times it is used.
substring = '?'
df_occ_weird_values = df_with_id.filter(df_with_id['occupation'].contains(substring))
df_occ_weird_values.show()
df_occ_weird_values.count()

+---+---------+--------+------------+-------------+--------------------+----------+-------------+------------------+------+------------+------------+--------------+--------------+------+---+
|age|workclass|  fnlwgt|   education|education-num|      marital-status|occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income| Id|
+---+---------+--------+------------+-------------+--------------------+----------+-------------+------------------+------+------------+------------+--------------+--------------+------+---+
| 54|        ?|180211.0|Some-college|         10.0|  Married-civ-spouse|         ?|      Husband|Asian-Pac-Islander|  Male|         0.0|         0.0|          60.0|         South|  >50K| 28|
| 32|        ?|293936.0|     7th-8th|          4.0|Married-spouse-ab...|         ?|Not-in-family|             White|  Male|         0.0|         0.0|          40.0|             ?| <=50K| 62|
| 25|        ?|200681.0|Some-college|        

1843

The missing column in workclass is 1836 and for occupation it is 1843, so there are few values that are not missing in workclass and still occupation is blank/ weird.

In [25]:
# Identifying vales in workclass for weird occupation values
df_occ_weird_values.select("workclass").distinct().show()

+------------+
|   workclass|
+------------+
|Never-worked|
|           ?|
+------------+



In [26]:
# Now we can also compare with weird occ df and our original df
df_occ_weird_values.filter(df_occ_weird_values['workclass'] == "Never-worked").select("Id", "workclass", "occupation", "age").show()

+-----+------------+----------+---+
|   Id|   workclass|occupation|age|
+-----+------------+----------+---+
| 5362|Never-worked|         ?| 18|
|10846|Never-worked|         ?| 23|
|14773|Never-worked|         ?| 17|
|20338|Never-worked|         ?| 18|
|23233|Never-worked|         ?| 20|
|32305|Never-worked|         ?| 30|
|32315|Never-worked|         ?| 18|
+-----+------------+----------+---+



In [27]:
never_worked_df = df_with_id.filter(df_with_id['workclass'] == "Never-worked").select("Id", "workclass", "occupation", "age")
never_worked_df.show()

+-----+------------+----------+---+
|   Id|   workclass|occupation|age|
+-----+------------+----------+---+
| 5362|Never-worked|         ?| 18|
|10846|Never-worked|         ?| 23|
|14773|Never-worked|         ?| 17|
|20338|Never-worked|         ?| 18|
|23233|Never-worked|         ?| 20|
|32305|Never-worked|         ?| 30|
|32315|Never-worked|         ?| 18|
+-----+------------+----------+---+



From the age column and Never-worked workclass we can assume one new occupation as Student and for **?** workclass as **Unknown** value in workclass as well as occupation.

In [28]:
# Lets Remove '?' from workclass and occupation
df_with_id = df_with_id.withColumn('workclass', F.when(df_with_id['workclass'] == "?", "Unknown").otherwise(df_with_id['workclass']))
df_with_id = df_with_id.withColumn('occupation', F.when(df_with_id['workclass'] == "Never-worked", "Student").otherwise(df_with_id['occupation']))


In [29]:
df_with_id = df_with_id.withColumn('occupation', F.when(df_with_id['workclass'] == "Unknown", "Unknown").otherwise(df_with_id['occupation']))

In [30]:
# Checking Replaced vlaues
df_with_id.select("workclass").distinct().show()
df_with_id.select("occupation").distinct().show()

+----------------+
|       workclass|
+----------------+
|Self-emp-not-inc|
|       Local-gov|
|       State-gov|
|         Private|
|         Unknown|
|     Without-pay|
|     Federal-gov|
|    Never-worked|
|    Self-emp-inc|
+----------------+

+-----------------+
|       occupation|
+-----------------+
|            Sales|
|  Exec-managerial|
|          Student|
|   Prof-specialty|
|Handlers-cleaners|
|  Farming-fishing|
|     Craft-repair|
|          Unknown|
| Transport-moving|
|  Priv-house-serv|
|  Protective-serv|
|    Other-service|
|     Tech-support|
|Machine-op-inspct|
|     Armed-Forces|
|     Adm-clerical|
+-----------------+



In [31]:
# Doing on Remaining string columns
print(str_columns)

['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country', 'income']


In [32]:
# 3. Marital-status
df_with_id.select("marital-status").distinct().show(truncate = False)

+---------------------+
|marital-status       |
+---------------------+
|Separated            |
|Never-married        |
|Married-spouse-absent|
|Divorced             |
|Widowed              |
|Married-AF-spouse    |
|Married-civ-spouse   |
+---------------------+



In [33]:
# 4. Education
df_with_id.select("education").distinct().show()

+------------+
|   education|
+------------+
|     Masters|
|        10th|
|     5th-6th|
|  Assoc-acdm|
|   Assoc-voc|
|     7th-8th|
|         9th|
|     HS-grad|
|   Bachelors|
|        11th|
|     1st-4th|
|   Preschool|
|        12th|
|   Doctorate|
|Some-college|
| Prof-school|
+------------+



In [34]:
# 5. Relationship
df_with_id.select("relationship").distinct().show()

+--------------+
|  relationship|
+--------------+
|     Own-child|
| Not-in-family|
|     Unmarried|
|          Wife|
|Other-relative|
|       Husband|
+--------------+



In [35]:
# 6. Race
df_with_id.select("race").distinct().show()

+------------------+
|              race|
+------------------+
|             Other|
|Amer-Indian-Eskimo|
|             White|
|Asian-Pac-Islander|
|             Black|
+------------------+



In [36]:
# 7. Sex
df_with_id.select("sex").distinct().show()

+------+
|   sex|
+------+
|Female|
|  Male|
+------+



In [37]:
# 8. Native_country
distinct_count_df = df_with_id.select("native-country").distinct()
distinct_count_df.show(distinct_count_df.count(), truncate =  False)

+--------------------------+
|native-country            |
+--------------------------+
|Philippines               |
|Germany                   |
|Cambodia                  |
|France                    |
|Greece                    |
|Taiwan                    |
|Ecuador                   |
|Nicaragua                 |
|Hong                      |
|Peru                      |
|India                     |
|China                     |
|Italy                     |
|Holand-Netherlands        |
|Cuba                      |
|South                     |
|Iran                      |
|Ireland                   |
|Thailand                  |
|Laos                      |
|El-Salvador               |
|Mexico                    |
|Guatemala                 |
|Honduras                  |
|Yugoslavia                |
|Puerto-Rico               |
|Jamaica                   |
|Canada                    |
|United-States             |
|Dominican-Republic        |
|Outlying-US(Guam-USVI-etc)|
|Japan        

In [38]:
# Lets check data for "?" in native country
weird_country_df = df_with_id.filter(df_with_id['native-country'] == "?") 
weird_country_df.show()

+---+----------------+--------+------------+-------------+--------------------+---------------+-------------+------------------+------+------------+------------+--------------+--------------+------+----+
|age|       workclass|  fnlwgt|   education|education-num|      marital-status|     occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|  Id|
+---+----------------+--------+------------+-------------+--------------------+---------------+-------------+------------------+------+------------+------------+--------------+--------------+------+----+
| 40|         Private|121772.0|   Assoc-voc|         11.0|  Married-civ-spouse|   Craft-repair|      Husband|Asian-Pac-Islander|  Male|         0.0|         0.0|          40.0|             ?|  >50K|  15|
| 31|         Private| 84154.0|Some-college|         10.0|  Married-civ-spouse|          Sales|      Husband|             White|  Male|         0.0|         0.0|          38.0|        

In [39]:
# Trying to identify country wrt to race
weird_country_df.select("native-country", "race").distinct().show()

+--------------+------------------+
|native-country|              race|
+--------------+------------------+
|             ?|             White|
|             ?|             Other|
|             ?|Asian-Pac-Islander|
|             ?|             Black|
+--------------+------------------+



In [40]:
# Let's replace the '?' with Unknow
df_with_id = df_with_id.withColumn("native-country", F.when(df_with_id['native-country'] == '?', "Unknown").otherwise(df_with_id["native-country"]))

In [41]:
distinct_count_df = df_with_id.select("native-country").distinct()
distinct_count_df.show(distinct_count_df.count(), truncate =  False)

+--------------------------+
|native-country            |
+--------------------------+
|Philippines               |
|Germany                   |
|Cambodia                  |
|France                    |
|Greece                    |
|Taiwan                    |
|Ecuador                   |
|Nicaragua                 |
|Hong                      |
|Peru                      |
|India                     |
|China                     |
|Unknown                   |
|Italy                     |
|Holand-Netherlands        |
|Cuba                      |
|South                     |
|Iran                      |
|Ireland                   |
|Thailand                  |
|Laos                      |
|El-Salvador               |
|Mexico                    |
|Guatemala                 |
|Honduras                  |
|Yugoslavia                |
|Puerto-Rico               |
|Jamaica                   |
|Canada                    |
|United-States             |
|Dominican-Republic        |
|Outlying-US(G

There is a missing value in Native country

In [42]:
# 9. Income
df_with_id.select("income").distinct().show()

+------+
|income|
+------+
| <=50K|
|  >50K|
+------+



### 2. Data Cleaning for interger data type

In [43]:
numerical_columns = [col_name for col_name, data_type in df_with_id.dtypes if isinstance(df_with_id.schema[col_name].dataType, (DoubleType, IntegerType))]
print(numerical_columns)

['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']


In [44]:
# Checking If any of the column has Null values
for col_name in numerical_columns:
    null_counts = df_with_id.where(F.col(col_name).isNull()).count()
    if null_counts > 0:
        print(f"{col_name} has {null_counts} null values")
    else:
        print(f"{col_name} doesnot have any values")

age doesnot have any values
fnlwgt doesnot have any values
education-num doesnot have any values
capital-gain doesnot have any values
capital-loss doesnot have any values
hours-per-week doesnot have any values


In [45]:
# Final checking if any of the column have null vales
col_names = df.columns
for col_name in col_names:
    null_counts = df_with_id.where(F.col(col_name).isNull()).count()
    if null_counts > 0:
        print(f"{col_name} has {null_counts} null values")
    else:
        print(f"{col_name} doesnot have any values")

age doesnot have any values
workclass doesnot have any values
fnlwgt doesnot have any values
education doesnot have any values
education-num doesnot have any values
marital-status doesnot have any values
occupation doesnot have any values
relationship doesnot have any values
race doesnot have any values
sex doesnot have any values
capital-gain doesnot have any values
capital-loss doesnot have any values
hours-per-week doesnot have any values
native-country doesnot have any values
income doesnot have any values


### 3. Data Modelling

In [46]:
# Creating Fact Table and Dimension Table:
fact_table= df_with_id.select("Id", "age", "workclass", "education","marital-status", "occupation", "relationship", "race", "sex",
                     "capital-gain", "capital-loss", "hours-per-week", "native-country", "income")
education_dim = df_with_id.select("education","education-num").distinct()
marital_status_dim = df_with_id.select("marital-status").distinct()
occupation_dim = df_with_id.select("occupation").distinct()

In [47]:
# Adding Id's to each table:
education_dim = education_dim.withColumn("education_id", F.monotonically_increasing_id()+1)
marital_status_dim = marital_status_dim.withColumn("marital_status_id", F.monotonically_increasing_id()+1)
occupation_dim = occupation_dim.withColumn("occupation_id", F.monotonically_increasing_id()+1)

In [48]:
fact_table.show()
education_dim.show()
marital_status_dim.show()
occupation_dim.show()

+---+---+----------------+------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| Id|age|       workclass|   education|      marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1| 39|       State-gov|   Bachelors|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|  2| 50|Self-emp-not-inc|   Bachelors|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|         0.0|         0.0|          13.0| United-States| <=50K|
|  3| 38|         Private|     HS-grad|            Divorced|Handlers-cleaners|Not-in-famil

In [49]:
# Replacing values with their respective id's in fact table
joined_df = fact_table.join(occupation_dim, "occupation","left").join(education_dim, "education", "left").join(marital_status_dim, "marital-status", "left")
joined_df.show()

+--------------------+------------+-----------------+---+---+----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-------------+-------------+------------+-----------------+
|      marital-status|   education|       occupation| Id|age|       workclass| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|occupation_id|education-num|education_id|marital_status_id|
+--------------------+------------+-----------------+---+---+----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-------------+-------------+------------+-----------------+
|       Never-married|   Bachelors|     Adm-clerical|  1| 39|       State-gov|Not-in-family|             White|  Male|      2174.0|         0.0|          40.0| United-States| <=50K|           16|         13.0|           4|                2|
|  Married-civ-spouse|   Bachelors| 

In [50]:
# Dropping the not required columns.
col_to_drop = ['education','occupation','marital-status']
adult_income_mst = joined_df.drop(*col_to_drop)

In [51]:
adult_income_mst.printSchema()

root
 |-- Id: long (nullable = false)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- occupation_id: long (nullable = true)
 |-- education-num: double (nullable = true)
 |-- education_id: long (nullable = true)
 |-- marital_status_id: long (nullable = true)



In [52]:
adult_income_mst = adult_income_mst.withColumnRenamed("capital-loss", "capital_loss").withColumnRenamed("capital-gain", "capital_gain").withColumnRenamed("capital-loss", "capital_loss").withColumnRenamed("hours-per-week", "hours_per_week").\
withColumnRenamed("native-country", "native_ountry").withColumnRenamed("education-num", "education_num")

In [53]:
adult_income_mst.printSchema()

root
 |-- Id: long (nullable = false)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_ountry: string (nullable = true)
 |-- income: string (nullable = true)
 |-- occupation_id: long (nullable = true)
 |-- education_num: double (nullable = true)
 |-- education_id: long (nullable = true)
 |-- marital_status_id: long (nullable = true)



In [54]:
adult_income_mst = adult_income_mst.withColumnRenamed( "native_ountry", "native_country")

### 4. Data transfer to postgres -SQL

In [58]:
# I need to transfer these tables to postgres - Sql database
# Configure JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/adult_income"

In [59]:
connection_properties = {
    "user": "postgres",
    "password": "rania_26",
    "driver": "org.postgresql.Driver"
}

In [60]:
# Writing data to database.
adult_income_mst.write\
        .format("jdbc")\
        .option("adult_income","adult_income_mst")\
        .mode("overwrite")\
        .jdbc(jdbc_url, "adult_income_mst",properties = connection_properties)

In [61]:
adult_income_mst.count()

32561

In [65]:
education_dim.printSchema()
marital_status_dim.printSchema()
occupation_dim.printSchema()

root
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- education_id: long (nullable = false)

root
 |-- marital_status: string (nullable = true)
 |-- marital_status_id: long (nullable = false)

root
 |-- occupation: string (nullable = true)
 |-- occupation_id: long (nullable = false)



In [63]:
education_dim = education_dim.withColumnRenamed("education-num", "education_num")
marital_status_dim = marital_status_dim.withColumnRenamed("marital-status", "marital_status")

In [66]:
# Writing data to database.
education_dim.write\
        .format("jdbc")\
        .option("adult_income","education_dim")\
        .mode("overwrite")\
        .jdbc(jdbc_url, "education_dim",properties = connection_properties)

In [67]:
# Writing data to database.
marital_status_dim.write\
        .format("jdbc")\
        .option("adult_income","marital_status_dim")\
        .mode("overwrite")\
        .jdbc(jdbc_url, "marital_status_dim",properties = connection_properties)