In [1]:
import pyspark
from pyspark import SparkContext
sc =SparkContext()

In [2]:
nums= sc.parallelize([1,2,3,4])	

In [3]:
nums.take(1)

[1]

In [4]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))

1 
4 
9 
16 


In [5]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [6]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

PythonRDD[4] at RDD at PythonRDD.scala:53

In [7]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

In [8]:
DF_ppl.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

In [10]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [11]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [12]:
df.show(5, truncate = False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [13]:
df_string = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=  False)

In [14]:
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [15]:
# Import all from `sql.types`
from pyspark.sql.types import *
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df

CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()    

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [16]:
df.select('age','fnlwgt').show(5)	

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



In [17]:
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [18]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+------------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|      capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+------------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|             4884

In [19]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655406|
|    min|                 0|
|    max|             99999|
+-------+------------------+



In [20]:
df.crosstab('age', 'income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [21]:
columns_to_select = ['age','workclass','fnlwgt','education','educational-num','marital-status','occupation']
df.select(columns_to_select).show(10)


+---+----------------+------+------------+---------------+------------------+-----------------+
|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation|
+---+----------------+------+------------+---------------+------------------+-----------------+
| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|
| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|
| 28|       Local-gov|336951|  Assoc-acdm|             12|Married-civ-spouse|  Protective-serv|
| 44|         Private|160323|Some-college|             10|Married-civ-spouse|Machine-op-inspct|
| 18|               ?|103497|Some-college|             10|     Never-married|                ?|
| 34|         Private|198693|        10th|              6|     Never-married|    Other-service|
| 29|               ?|227026|     HS-grad|              9|     Never-married|                ?|
| 63|Self-emp-not-inc|104626| Prof-schoo

In [22]:
print (df.filter(df.age > 40).count()	, df.count())

20211 48842


In [23]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [24]:
from pyspark.sql.functions import *
df = df.withColumn("age_square", col("age")**2)

In [25]:
df.select("age", "age_square").show(10)

+---+----------+
|age|age_square|
+---+----------+
| 25|     625.0|
| 38|    1444.0|
| 28|     784.0|
| 44|    1936.0|
| 18|     324.0|
| 34|    1156.0|
| 29|     841.0|
| 63|    3969.0|
| 24|     576.0|
| 55|    3025.0|
+---+----------+
only showing top 10 rows



In [26]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'educational-num', 'marital-status',
           'occupation', 'relationship', 'race', 'gender', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native-country', 'income']
df = df.select(COLUMNS)
df.first()

Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K')

In [27]:
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|          Yugoslavia|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [28]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")

In [29]:
model = stringIndexer.fit(df)		

In [30]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.select('age','workclass','income','workclass_encoded','workclass_vec').show(15)

+---+----------------+------+-----------------+-------------+
|age|       workclass|income|workclass_encoded|workclass_vec|
+---+----------------+------+-----------------+-------------+
| 25|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 38|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 28|       Local-gov|  >50K|              2.0|(9,[2],[1.0])|
| 44|         Private|  >50K|              0.0|(9,[0],[1.0])|
| 18|               ?| <=50K|              3.0|(9,[3],[1.0])|
| 34|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 29|               ?| <=50K|              3.0|(9,[3],[1.0])|
| 63|Self-emp-not-inc|  >50K|              1.0|(9,[1],[1.0])|
| 24|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 55|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 65|         Private|  >50K|              0.0|(9,[0],[1.0])|
| 36|     Federal-gov| <=50K|              6.0|(9,[6],[1.0])|
| 26|         Private| <=50K|              0.0|(9,[0],[1.0])|
| 58|   

In [31]:
df.printSchema()


root
 |-- age: integer (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [33]:
print (len(stages) )

16


In [34]:
print(stages)

[StringIndexer_c2a79139c36f, OneHotEncoderEstimator_778199e27fd8, StringIndexer_4949b884d946, OneHotEncoderEstimator_c593a6c08435, StringIndexer_5a5459159e68, OneHotEncoderEstimator_9c2232cbd141, StringIndexer_77e916f2aba0, OneHotEncoderEstimator_64b41a0bf348, StringIndexer_627251703154, OneHotEncoderEstimator_a1158e12dfc8, StringIndexer_af6776d59cc1, OneHotEncoderEstimator_d02bde5124fd, StringIndexer_8dcbb1e2b113, OneHotEncoderEstimator_252ca36826df, StringIndexer_1f2ca5f4e24d, OneHotEncoderEstimator_ca5ef4c0a650]


In [35]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [label_stringIdx]

In [36]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [37]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [38]:
# Create a Pipeline.
df_remove = df.filter(df["native-country"] !=	'Holand-Netherlands')	
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [39]:
model.take(1)

[Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newincome=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.0, 96: 7.

In [40]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))

In [41]:
df_train = sqlContext.createDataFrame(input_data, ["newincome", "features"])
df_train.show(2)

+---------+--------------------+
|newincome|            features|
+---------+--------------------+
|      0.0|[1.0,0.0,0.0,0.0,...|
|      0.0|[1.0,0.0,0.0,0.0,...|
+---------+--------------------+
only showing top 2 rows



In [42]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [43]:
train_data.printSchema()
# print(train_data.count())

root
 |-- newincome: double (nullable = true)
 |-- features: vector (nullable = true)



In [44]:
train_data.groupby('newincome').agg({'newincome': 'count'}).show()

+---------+----------------+
|newincome|count(newincome)|
+---------+----------------+
|      0.0|           29652|
|      1.0|            9326|
+---------+----------------+



In [50]:
test_data.groupby('newincome').agg({'newincome': 'count'}).show()

+---------+----------------+
|newincome|count(newincome)|
+---------+----------------+
|      0.0|            7502|
|      1.0|            2361|
+---------+----------------+



In [47]:
test_data.printSchema()

root
 |-- newincome: double (nullable = true)
 |-- features: vector (nullable = true)



In [49]:
print(test_data.count())

9863
