In [1]:
from pyspark.context import SparkContext

sc = SparkContext('local', 'test')

from pyspark.sql import SQLContext

spark = SQLContext(sc)

df_train = spark.read.parquet("parquet/train")
df_test = spark.read.parquet("parquet/test")

In [4]:
df_train.toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income
0,39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
1,50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
2,38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
3,53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
4,28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
32556,27.0,Private,257302.0,Assoc-acdm,12.0,Married-civ-spouse,Tech-support,Wife,White,Female,0.0,0.0,38.0,United-States,<=50K
32557,40.0,Private,154374.0,HS-grad,9.0,Married-civ-spouse,Machine-op-inspct,Husband,White,Male,0.0,0.0,40.0,United-States,>50K
32558,58.0,Private,151910.0,HS-grad,9.0,Widowed,Adm-clerical,Unmarried,White,Female,0.0,0.0,40.0,United-States,<=50K
32559,22.0,Private,201490.0,HS-grad,9.0,Never-married,Adm-clerical,Own-child,White,Male,0.0,0.0,20.0,United-States,<=50K


In [6]:
df_train.select("age").describe().show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             32561|
|   mean| 38.58164675532078|
| stddev|13.640432553581356|
|    min|              17.0|
|    max|              90.0|
+-------+------------------+



In [10]:
df_train.select("relationship").groupby('relationship').count().show()

+---------------+-----+
|   relationship|count|
+---------------+-----+
|        Husband|13193|
|      Own-child| 5068|
|  Not-in-family| 8305|
| Other-relative|  981|
|           Wife| 1568|
|      Unmarried| 3446|
+---------------+-----+



In [12]:
df_train.select("hours-per-week").describe().show()

+-------+------------------+
|summary|    hours-per-week|
+-------+------------------+
|  count|             32561|
|   mean|40.437455852092995|
| stddev|12.347428681731838|
|    min|               1.0|
|    max|              99.0|
+-------+------------------+



In [25]:
from pyspark.sql import functions as F


def hours_bucket_udf(hours):
    if hours<28:
        return 'low'
    elif hours<52:
        return 'mid'
    else:
        return 'high'

hours_udf = F.udf(hours_bucket_udf)

df_train_small = df_train.select("age","sex","hours-per-week","income") \
    .withColumn("age_bucket", F.when(F.col("age")<38,"low").otherwise("high")) \
    .withColumn("hours_bucket", hours_udf(F.col("hours-per-week"))) \
    .withColumn("label", F.when(F.col("income")==' <=50K',0).otherwise(1)) \
    .select("sex","age_bucket","hours_bucket","label")

df_train_small.show()

+-------+----------+------------+-----+
|    sex|age_bucket|hours_bucket|label|
+-------+----------+------------+-----+
|   Male|      high|         mid|    0|
|   Male|      high|         low|    0|
|   Male|      high|         mid|    0|
|   Male|      high|         mid|    0|
| Female|       low|         mid|    0|
| Female|       low|         mid|    0|
| Female|      high|         low|    0|
|   Male|      high|         mid|    1|
| Female|       low|         mid|    1|
|   Male|      high|         mid|    1|
|   Male|       low|        high|    1|
|   Male|       low|         mid|    1|
| Female|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|      high|         mid|    1|
|   Male|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|      high|         mid|    0|
| Female|      high|         mid|    1|
+-------+----------+------------+-----+
only showing top 20 rows



In [29]:
df_train_small.groupby("sex","label").count().sort("sex").show()

+-------+-----+-----+
|    sex|label|count|
+-------+-----+-----+
| Female|    0| 9592|
| Female|    1| 1179|
|   Male|    0|15128|
|   Male|    1| 6662|
+-------+-----+-----+



In [34]:
import math
def entropy(probabs):
    return -sum([ p * math.log(p,2) for p in probabs])

In [36]:
entropy([0.01,0.99])

0.08079313589591118

In [45]:
# entropía Male
p_male_0 = 15128/(6662+15128)
p_male_1 = 6662/(6662+15128)
entropy_male = entropy([p_male_0,p_male_1])
entropy_male

0.8881906136796107

In [46]:
# entropía Female
p_female_0 = 9592/(1179+9592)
p_female_1 = 1- p_female_0
entropy_female = entropy([p_female_0,p_female_1])
entropy_female

0.49828678366738155

In [47]:
# "entropía" de "sex" como atributo
males = 6662+15128
females = 1179+9592
population = males + females
entropy_sex= entropy_male*(males/population) + entropy_female * (females/population)
entropy_sex

0.759212567763892

In [49]:
# consideremos age_bucket

df_train_small.groupby("age_bucket","label").count().sort("age_bucket").show()

+----------+-----+-----+
|age_bucket|label|count|
+----------+-----+-----+
|      high|    1| 5581|
|      high|    0|10299|
|       low|    1| 2260|
|       low|    0|14421|
+----------+-----+-----+



In [53]:
entropy_high = entropy([5581/(5581+10299),10299/(5581+10299)])
entropy_low = entropy([2260/(2260+14421),14421/(2260+14421)])
entropy_low,entropy_high

(0.5722871298658747, 0.9353549188478923)

In [54]:
def entropy_from_values(values):
  # example: values [5581,10299]
    sum_values = sum(values)
    probabilities = [value/sum_values for value in values]
    return entropy(probabilities)

entropy_high = entropy_from_values([5581,10299])
entropy_low = entropy_from_values([2260, 14421])
entropy_low,entropy_high

(0.5722871298658747, 0.9353549188478923)

In [75]:
weighted_entropy = entropy_low *(2260 + 14421)/(5581+ 10299+2260 + 14421)  + entropy_high*(5581+10299)/(5581+ 10299+2260 + 14421) 
weighted_entropy

0.7493552938975212

In [94]:
table_collected = df_train_small.groupby("age_bucket","label").count().sort("age_bucket").collect()
table_collected

[Row(age_bucket='high', label=1, count=5581),
 Row(age_bucket='high', label=0, count=10299),
 Row(age_bucket='low', label=1, count=2260),
 Row(age_bucket='low', label=0, count=14421)]

In [95]:
attribute = 'age_bucket'
distinct_attribute_values = set([row[attribute] for row in table_collected])
distinct_attribute_values


{'high', 'low'}

In [97]:
def attribute_values(table_collected, attribute, distinct_attribute_values):
    counts_and_totals = []
    for attr_value in distinct_attribute_values:
        values_attr = [row['count'] for row in table_collected if row[attribute]==attr_value]
        sum_values_attr  = sum(values_attr)
        counts_and_totals = counts_and_totals + [(values_attr, sum_values_attr)]
    return counts_and_totals

def weighted_entropy_table(table_collected, attribute):
    distinct_attribute_values = set([row[attribute] for row in table_collected])
    attr_values = attribute_values(table_collected, attribute, distinct_attribute_values)
    sum_splits = sum([totals for values, totals in attr_values])
    return sum([(entropy_from_values(values)* totals)/sum_splits for values, totals in attr_values])
    
weighted_entropy_table(table_collected,'age_bucket')

0.7493552938975212

In [98]:
table_collected = df_train_small.groupby("hours_bucket","label").count().sort("hours_bucket").collect()
table_collected

[Row(hours_bucket='high', label=1, count=1501),
 Row(hours_bucket='high', label=0, count=2129),
 Row(hours_bucket='low', label=1, count=264),
 Row(hours_bucket='low', label=0, count=3739),
 Row(hours_bucket='mid', label=1, count=6076),
 Row(hours_bucket='mid', label=0, count=18852)]

In [99]:
weighted_entropy_table(table_collected,'hours_bucket')

0.7655570162856272

In [100]:
table_collected = df_train_small.groupby("sex","label").count().sort("sex").collect()
table_collected

[Row(sex=' Female', label=0, count=9592),
 Row(sex=' Female', label=1, count=1179),
 Row(sex=' Male', label=0, count=15128),
 Row(sex=' Male', label=1, count=6662)]

In [101]:
weighted_entropy_table(table_collected,'sex')

0.7592125677638919

In [102]:
def weighted_entropy_for_attribute(df_train_small, attribute):
    table_collected = df_train_small.groupby(attribute,"label").count().sort(attribute).collect()
    return weighted_entropy_table(table_collected,attribute)


In [103]:
[ (attribute, weighted_entropy_for_attribute(df_train_small, attribute)) for attribute in ['sex','hours_bucket','age_bucket']]

[('sex', 0.7592125677638919),
 ('hours_bucket', 0.7655570162856272),
 ('age_bucket', 0.7493552938975212)]

In [105]:
df_train_small.show()

+-------+----------+------------+-----+
|    sex|age_bucket|hours_bucket|label|
+-------+----------+------------+-----+
|   Male|      high|         mid|    0|
|   Male|      high|         low|    0|
|   Male|      high|         mid|    0|
|   Male|      high|         mid|    0|
| Female|       low|         mid|    0|
| Female|       low|         mid|    0|
| Female|      high|         low|    0|
|   Male|      high|         mid|    1|
| Female|       low|         mid|    1|
|   Male|      high|         mid|    1|
|   Male|       low|        high|    1|
|   Male|       low|         mid|    1|
| Female|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|      high|         mid|    1|
|   Male|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|       low|         mid|    0|
|   Male|      high|         mid|    0|
| Female|      high|         mid|    1|
+-------+----------+------------+-----+
only showing top 20 rows



In [106]:
df_train_small_age_high = df_train_small.where(F.col('age_bucket')=='high')

In [108]:
df_train_small_age_high.show()

+-------+----------+------------+-----+
|    sex|age_bucket|hours_bucket|label|
+-------+----------+------------+-----+
|   Male|      high|         mid|    0|
|   Male|      high|         low|    0|
|   Male|      high|         mid|    0|
|   Male|      high|         mid|    0|
| Female|      high|         low|    0|
|   Male|      high|         mid|    1|
|   Male|      high|         mid|    1|
|   Male|      high|         mid|    1|
|   Male|      high|         mid|    0|
| Female|      high|         mid|    1|
|   Male|      high|        high|    1|
| Female|      high|         low|    0|
|   Male|      high|         mid|    0|
| Female|      high|         mid|    0|
|   Male|      high|         mid|    1|
|   Male|      high|        high|    1|
|   Male|      high|        high|    0|
|   Male|      high|         mid|    0|
|   Male|      high|         mid|    0|
|   Male|      high|         mid|    0|
+-------+----------+------------+-----+
only showing top 20 rows



In [109]:
[ (attribute, weighted_entropy_for_attribute(df_train_small_age_high, attribute)) for attribute in ['sex','hours_bucket']]

[('sex', 0.8826664526766186), ('hours_bucket', 0.9098707892103722)]

In [110]:
df_train_small_age_low = df_train_small.where(F.col('age_bucket')=='low')
[ (attribute, weighted_entropy_for_attribute(df_train_small_age_low, attribute)) for attribute in ['sex','hours_bucket']]

[('sex', 0.5557044933658458), ('hours_bucket', 0.5433274609106593)]

In [None]:
#                      age_bucket
#                 / (low)           \ (high)
#               hours_bucket            sex
#              /      |    \           /   \
