In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
sc =SparkContext()

In [2]:
from pyspark.sql import SQLContext
from pyspark import SparkFiles

url = "https://github.com/guru99-edu/R-Programming/raw/master/adult_data.csv"
    
sc.addFile(url)
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)


In [4]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [12]:
df.select(["x", "age", "fnlwgt", "educational-num"]).show()

+---+---+------+---------------+
|  x|age|fnlwgt|educational-num|
+---+---+------+---------------+
|  1| 25|226802|              7|
|  2| 38| 89814|              9|
|  3| 28|336951|             12|
|  4| 44|160323|             10|
|  5| 18|103497|             10|
|  6| 34|198693|              6|
|  7| 29|227026|              9|
|  8| 63|104626|             15|
|  9| 24|369667|             10|
| 10| 55|104996|              4|
| 11| 65|184454|              9|
| 12| 36|212465|             13|
| 13| 26| 82091|              9|
| 14| 58|299831|              9|
| 15| 48|279724|              9|
| 16| 43|346189|             14|
| 17| 20|444554|             10|
| 18| 43|128354|              9|
| 19| 37| 60548|              9|
| 20| 40| 85019|             16|
+---+---+------+---------------+
only showing top 20 rows



In [35]:
from pyspark.sql.types import FloatType

def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

CONTI_FEATURES  = []

for name, dtype in df.dtypes:
    if dtype == "int" and not "x" in name:
        CONTI_FEATURES.append(name)

print(CONTI_FEATURES)

df = convertColumn(df, CONTI_FEATURES, FloatType())

df.printSchema()

['age', 'fnlwgt', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week']
root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [40]:
df.groupby("education").count().sort("count", ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [41]:
df.select(CONTI_FEATURES).describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|               age|            fnlwgt|   educational-num|      capital-gain|     capital-loss|    hours-per-week|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             48842|             48842|             48842|             48842|            48842|             48842|
|   mean| 38.64358543876172|189664.13459727284|10.078088530363212|1079.0676262233324|87.50231358257237|40.422382375824085|
| stddev|13.710509934443502|105604.02542315757| 2.570972755592252| 7452.019057655413|403.0045521243591|12.391444024252289|
|    min|              17.0|           12285.0|               1.0|               0.0|              0.0|               1.0|
|    max|              90.0|         1490400.0|              16.0|           99999.0|           4356.0|              99.0|
+-------+-------

In [42]:
df.crosstab("age", "income").sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|      17.0|  595|   0|
|      18.0|  862|   0|
|      19.0| 1050|   3|
|      20.0| 1112|   1|
|      21.0| 1090|   6|
|      22.0| 1161|  17|
|      23.0| 1307|  22|
|      24.0| 1162|  44|
|      25.0| 1119|  76|
|      26.0| 1068|  85|
|      27.0| 1117| 115|
|      28.0| 1101| 179|
|      29.0| 1025| 198|
|      30.0| 1031| 247|
|      31.0| 1050| 275|
|      32.0|  957| 296|
|      33.0| 1045| 290|
|      34.0|  949| 354|
|      35.0|  997| 340|
|      36.0|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [47]:
df.crosstab("gender", "income").sort("gender_income").show()

+-------------+-----+----+
|gender_income|<=50K|>50K|
+-------------+-----+----+
|       Female|14423|1769|
|         Male|22732|9918|
+-------------+-----+----+



In [45]:
df.drop('educational-num').columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [59]:
df.filter(df.age > 40).count()

20211

In [60]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [62]:
from pyspark.sql.functions import *

df = df.withColumn("age_square", col("age")**2)

df.printSchema()

df.select(['x', 'age', 'age_square']).show()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)

+---+----+----------+
|  x| age|age_square|
+---+----+----------+
|  1|25.0|     625.0|
|  2|38.0|    1444.0|
|  3|28.0|     784.0|
|  4|44.0|    1936.0|
|  5|18.0|     324.0|
|  6|34.0|    1156.0|
|  7|29.0|     841.0|
|  8|63.0|    3969.0|
|  9|24.0|     576.0|
| 10|55.0|    3025.0|
| 11|65.0|  

In [72]:
COLUMNAS = ['x',
            'age',
            'age_square',
            'workclass',
            'fnlwgt',
            'education',
            'educational-num',
            'marital-status',
            'occupation',
            'relationship',
            'race',
            'gender',
            'capital-gain',
            'capital-loss',
            'hours-per-week',
            'native-country',
            'income',
            ]

df = df.select(COLUMNAS)

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [73]:
df.groupby('native-country').count().sort('count').show()

+--------------------+-----+
|      native-country|count|
+--------------------+-----+
|  Holand-Netherlands|    1|
|             Hungary|   19|
|            Honduras|   20|
|            Scotland|   21|
|                Laos|   23|
|          Yugoslavia|   23|
|Outlying-US(Guam-...|   23|
|     Trinadad&Tobago|   27|
|            Cambodia|   28|
|                Hong|   30|
|            Thailand|   30|
|             Ireland|   37|
|              France|   38|
|             Ecuador|   45|
|                Peru|   46|
|              Greece|   49|
|           Nicaragua|   49|
|                Iran|   59|
|              Taiwan|   65|
|            Portugal|   67|
+--------------------+-----+
only showing top 20 rows



In [77]:
df_remove = df.filter(df['native-country'] != "Holand-Netherlands")

df_remove.groupby('native-country').count().sort('count').show(100)

+--------------------+-----+
|      native-country|count|
+--------------------+-----+
|             Hungary|   19|
|            Honduras|   20|
|            Scotland|   21|
|                Laos|   23|
|          Yugoslavia|   23|
|Outlying-US(Guam-...|   23|
|     Trinadad&Tobago|   27|
|            Cambodia|   28|
|            Thailand|   30|
|                Hong|   30|
|             Ireland|   37|
|              France|   38|
|             Ecuador|   45|
|                Peru|   46|
|           Nicaragua|   49|
|              Greece|   49|
|                Iran|   59|
|              Taiwan|   65|
|            Portugal|   67|
|               Haiti|   75|
|            Columbia|   85|
|             Vietnam|   86|
|              Poland|   87|
|           Guatemala|   88|
|               Japan|   92|
|  Dominican-Republic|  103|
|               Italy|  105|
|             Jamaica|  106|
|               South|  115|
|               China|  122|
|             England|  127|
|             

In [91]:
paises = df.select('native-country').distinct().collect()

for pais in paises:
    print(pais['native-country'])

Philippines
Germany
Cambodia
France
Greece
Taiwan
Ecuador
Nicaragua
Hong
Peru
India
China
Italy
Holand-Netherlands
Cuba
South
Iran
Ireland
Thailand
Laos
El-Salvador
Mexico
Guatemala
Honduras
Yugoslavia
Puerto-Rico
Jamaica
Canada
United-States
Dominican-Republic
Outlying-US(Guam-USVI-etc)
Japan
England
Haiti
Poland
Portugal
?
Columbia
Scotland
Hungary
Vietnam
Trinadad&Tobago


In [100]:
var = df.groupby('native-country').count().sort('count').collect()

var

[Row(native-country='Holand-Netherlands', count=1),
 Row(native-country='Hungary', count=19),
 Row(native-country='Honduras', count=20),
 Row(native-country='Scotland', count=21),
 Row(native-country='Laos', count=23),
 Row(native-country='Yugoslavia', count=23),
 Row(native-country='Outlying-US(Guam-USVI-etc)', count=23),
 Row(native-country='Trinadad&Tobago', count=27),
 Row(native-country='Cambodia', count=28),
 Row(native-country='Hong', count=30),
 Row(native-country='Thailand', count=30),
 Row(native-country='Ireland', count=37),
 Row(native-country='France', count=38),
 Row(native-country='Ecuador', count=45),
 Row(native-country='Peru', count=46),
 Row(native-country='Greece', count=49),
 Row(native-country='Nicaragua', count=49),
 Row(native-country='Iran', count=59),
 Row(native-country='Taiwan', count=65),
 Row(native-country='Portugal', count=67),
 Row(native-country='Haiti', count=75),
 Row(native-country='Columbia', count=85),
 Row(native-country='Vietnam', count=86),
 Ro

In [115]:
for v in var:
    print('{:>25} \t {}'.format(v['native-country'], v['count']))

       Holand-Netherlands 	 1
                  Hungary 	 19
                 Honduras 	 20
                 Scotland 	 21
                     Laos 	 23
               Yugoslavia 	 23
Outlying-US(Guam-USVI-etc) 	 23
          Trinadad&Tobago 	 27
                 Cambodia 	 28
                     Hong 	 30
                 Thailand 	 30
                  Ireland 	 37
                   France 	 38
                  Ecuador 	 45
                     Peru 	 46
                   Greece 	 49
                Nicaragua 	 49
                     Iran 	 59
                   Taiwan 	 65
                 Portugal 	 67
                    Haiti 	 75
                 Columbia 	 85
                  Vietnam 	 86
                   Poland 	 87
                Guatemala 	 88
                    Japan 	 92
       Dominican-Republic 	 103
                    Italy 	 105
                  Jamaica 	 106
                    South 	 115
                    China 	 122
                  England 	 127
  

In [132]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler


CATE_FEATURES  = []

for name, dtype in df.dtypes:
    if dtype == "string" and not "income" in name:
        CATE_FEATURES.append(name)

stages = []

for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[categoricalCol + "Index"], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]


In [139]:
labelstringIdx = StringIndexer(inputCol="income", outputCol="newlabel")
stages += [labelstringIdx]

In [141]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [145]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')


In [146]:
stages += [assembler]

In [155]:
# create a pipeline

df_remove = df_remove.drop("x")

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)


In [162]:
model.take(1)

[Row(age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7.0, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native-country='United-States', income='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 22680

In [164]:
from pyspark.ml.linalg import DenseVector

input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [166]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])


In [169]:
df_train.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,1.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,1.0,...|
+-----+--------------------+
only showing top 5 rows



In [171]:
train_data, test_data = df_train.randomSplit([.8, .2], seed=1234)

In [173]:
train_data.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|29701|
|  1.0| 9345|
+-----+-----+



In [174]:
test_data.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 7453|
|  1.0| 2342|
+-----+-----+



In [175]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label",
                       featuresCol="features", 
                       maxIter=10,
                       regParam=0.3)

linearModel = lr.fit(train_data)

In [None]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

In [176]:
# evaluar modelo

prediccions = linearModel.transform(test_data)


In [177]:
prediccions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [179]:
selected = prediccions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.93053780544176...|
|  0.0|       0.0|[0.94603248831750...|
|  0.0|       0.0|[0.81182553768189...|
|  0.0|       0.0|[0.91346022215182...|
|  0.0|       0.0|[0.55398815653954...|
|  0.0|       1.0|[0.28877064190246...|
|  0.0|       1.0|[0.35997365835694...|
|  0.0|       0.0|[0.90778664876464...|
|  0.0|       1.0|[0.44580363687604...|
|  0.0|       1.0|[0.34448851921895...|
|  0.0|       0.0|[0.89461993646948...|
|  0.0|       0.0|[0.85109833395415...|
|  0.0|       0.0|[0.84629201450027...|
|  0.0|       0.0|[0.93049325351024...|
|  0.0|       0.0|[0.66600206899818...|
|  0.0|       0.0|[0.75939329677079...|
|  0.0|       0.0|[0.83720480986057...|
|  0.0|       0.0|[0.82666412168648...|
|  0.0|       0.0|[0.80823811902389...|
|  0.0|       0.0|[0.84657848922666...|
+-----+----------+--------------------+
only showing top 20 rows



In [180]:
cm = selected.select(["label", "prediction"])
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8250127616130679

In [183]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("evaluate", evaluator.evaluate(prediccions))
print("MetricName", evaluator.getMetricName())

evaluate 0.8893672766071866
MetricName areaUnderROC


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.5])
            .build())


In [None]:
from time import *
start_time = time()
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)

# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time

print("Time to train model: %.3f seconds" % elapsed_time)
