In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [2]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

In [3]:
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)

Number of elements in RDD -> 8


In [4]:
coll = words.collect()
print "Elements in RDD -> %s" % (coll)

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [3]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
census = hive_context.table("adult_census")
census.show()

+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
| age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
|null|       workclass|  null|   education|         null|    marital_status|       occupation|  relationship| race|   sex|        null|        null|          null|native_country|income|
|  22|         Private|148187|        11th|            7|     Never-married|    Other-service|Other-relative|White|  Male|           0|           0|            40| United-States| <=50K|
|  77|Self-emp-not-inc|138714|Some-college|           10|Married-civ-s

In [4]:
census.registerTempTable("census_temp")
hive_context.sql("select * from census_temp").show()

+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
| age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
|null|       workclass|  null|   education|         null|    marital_status|       occupation|  relationship| race|   sex|        null|        null|          null|native_country|income|
|  22|         Private|148187|        11th|            7|     Never-married|    Other-service|Other-relative|White|  Male|           0|           0|            40| United-States| <=50K|
|  77|Self-emp-not-inc|138714|Some-college|           10|Married-civ-s

In [5]:
hive_context.sql("select race from census_temp").show()

+-----+
| race|
+-----+
| race|
|White|
|White|
|White|
|White|
|Black|
|White|
|Black|
|White|
|White|
|White|
|White|
|White|
|White|
|White|
|Black|
|Black|
|White|
|White|
|White|
+-----+
only showing top 20 rows



In [72]:
from pyspark.sql import SQLContext

In [73]:
sql_context=SQLContext(sc)

In [74]:
df = sql_context.sql("SELECT * FROM adult_census")

In [75]:
df.show()

+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
| age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
|null|       workclass|  null|   education|         null|    marital_status|       occupation|  relationship| race|   sex|        null|        null|          null|native_country|income|
|  22|         Private|148187|        11th|            7|     Never-married|    Other-service|Other-relative|White|  Male|           0|           0|            40| United-States| <=50K|
|  77|Self-emp-not-inc|138714|Some-college|           10|Married-civ-s

In [76]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [77]:
df.select('marital_status').show()

+------------------+
|    marital_status|
+------------------+
|    marital_status|
|     Never-married|
|Married-civ-spouse|
|           Widowed|
|     Never-married|
|     Never-married|
|Married-civ-spouse|
|         Separated|
|Married-civ-spouse|
|Married-civ-spouse|
|     Never-married|
|Married-civ-spouse|
|Married-civ-spouse|
|Married-civ-spouse|
|Married-civ-spouse|
|     Never-married|
|Married-civ-spouse|
|     Never-married|
|Married-civ-spouse|
|Married-civ-spouse|
+------------------+
only showing top 20 rows



In [78]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [79]:
from pyspark.ml.feature import StringIndexer

In [80]:
indexer = StringIndexer(inputCol = 'marital_status', outputCol = 'MSI')
indexed_df = indexer.fit(df).transform(df)
indexed_df.show()

+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+---+
| age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|MSI|
+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+---+
|null|       workclass|  null|   education|         null|    marital_status|       occupation|  relationship| race|   sex|        null|        null|          null|native_country|income|7.0|
|  22|         Private|148187|        11th|            7|     Never-married|    Other-service|Other-relative|White|  Male|           0|           0|            40| United-States| <=50K|1.0|
|  77|Self-emp-not-inc|138714|Some-college|       

In [81]:
indexed_df.select('workclass').show()

+----------------+
|       workclass|
+----------------+
|       workclass|
|         Private|
|Self-emp-not-inc|
|               ?|
|         Private|
|         Private|
|       State-gov|
|         Private|
|         Private|
|    Self-emp-inc|
|               ?|
|         Private|
|Self-emp-not-inc|
|       State-gov|
|       Local-gov|
|         Private|
|     Federal-gov|
|Self-emp-not-inc|
|         Private|
|         Private|
+----------------+
only showing top 20 rows



In [82]:
indexer = StringIndexer(inputCol = 'workclass', outputCol = 'WCI')
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
indexed_df.show()

+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+---+---+
| age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|MSI|WCI|
+----+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+---+---+
|null|       workclass|  null|   education|         null|    marital_status|       occupation|  relationship| race|   sex|        null|        null|          null|native_country|income|7.0|9.0|
|  22|         Private|148187|        11th|            7|     Never-married|    Other-service|Other-relative|White|  Male|           0|           0|            40| United-States| <=50K|1.0|0.0|
|  77|Self-emp-not-inc|138714|

In [83]:
indexer = StringIndexer(inputCol = 'occupation', outputCol = 'OCI')
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
indexed_df.select('OCI').show()

+----+
| OCI|
+----+
|15.0|
| 5.0|
| 4.0|
| 7.0|
| 2.0|
| 3.0|
| 1.0|
| 5.0|
| 5.0|
| 2.0|
| 7.0|
| 0.0|
| 1.0|
| 2.0|
| 1.0|
|12.0|
| 3.0|
|10.0|
| 6.0|
| 2.0|
+----+
only showing top 20 rows



In [84]:
indexer = StringIndexer(inputCol = 'relationship', outputCol = 'RLI')
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
indexed_df.select('RLI').show()

+---+
|RLI|
+---+
|6.0|
|5.0|
|0.0|
|3.0|
|2.0|
|2.0|
|0.0|
|1.0|
|4.0|
|0.0|
|2.0|
|0.0|
|0.0|
|0.0|
|0.0|
|5.0|
|0.0|
|2.0|
|0.0|
|0.0|
+---+
only showing top 20 rows



In [85]:
indexer = StringIndexer(inputCol = 'income', outputCol = 'ICI')
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
indexed_df.select('ICI').show()

+---+
|ICI|
+---+
|2.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|1.0|
|1.0|
|0.0|
|0.0|
|0.0|
|0.0|
|0.0|
|1.0|
+---+
only showing top 20 rows



In [86]:
indexed_df.select('hours_per_week').show()

+--------------+
|hours_per_week|
+--------------+
|          null|
|            40|
|            40|
|            40|
|            45|
|            36|
|            15|
|            40|
|            15|
|            45|
|            40|
|            50|
|            25|
|            47|
|            50|
|            40|
|            40|
|            20|
|            50|
|            50|
+--------------+
only showing top 20 rows



In [87]:
indexed_df2 = indexed_df.na.drop(how='any')

In [88]:
indexed_df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- MSI: double (nullable = true)
 |-- WCI: double (nullable = true)
 |-- OCI: double (nullable = true)
 |-- RLI: double (nullable = true)
 |-- ICI: double (nullable = true)



In [89]:
featureassembler = VectorAssembler(inputCols=['age','WCI','education_num','MSI','OCI','RLI','hours_per_week'],
                                  outputCol='Independent Features')

In [90]:
output = featureassembler.transform(indexed_df2)

In [91]:
indexed_df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- MSI: double (nullable = true)
 |-- WCI: double (nullable = true)
 |-- OCI: double (nullable = true)
 |-- RLI: double (nullable = true)
 |-- ICI: double (nullable = true)



In [92]:
finalized_data = output.select('Independent Features','ICI')

In [93]:
finalized_data.show()

+--------------------+---+
|Independent Features|ICI|
+--------------------+---+
|[22.0,0.0,7.0,1.0...|0.0|
|[77.0,1.0,10.0,0....|0.0|
|[47.0,3.0,9.0,4.0...|0.0|
|[23.0,0.0,9.0,1.0...|0.0|
|[46.0,0.0,10.0,1....|0.0|
|[29.0,4.0,14.0,0....|0.0|
|[35.0,0.0,9.0,3.0...|0.0|
|[28.0,0.0,13.0,0....|0.0|
|[58.0,5.0,9.0,0.0...|0.0|
|[19.0,3.0,9.0,1.0...|0.0|
|(7,[0,2,6],[32.0,...|0.0|
|[67.0,1.0,15.0,0....|1.0|
|[47.0,4.0,14.0,0....|1.0|
|[36.0,2.0,13.0,0....|0.0|
|[29.0,0.0,9.0,1.0...|0.0|
|[26.0,6.0,10.0,0....|0.0|
|[17.0,1.0,7.0,1.0...|0.0|
|[28.0,0.0,9.0,0.0...|0.0|
|[37.0,0.0,13.0,0....|1.0|
|[60.0,3.0,9.0,0.0...|1.0|
+--------------------+---+
only showing top 20 rows



In [101]:
train_data,test_data = finalized_data.randomSplit([.75,.25])

In [102]:
regressor = LogisticRegression(featuresCol = 'Independent Features',labelCol='ICI')

In [103]:
fit_model = regressor.fit(train_data)
results = fit_model.transform(test_data)

In [105]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='ICI')
results.select('ICI','prediction')
AUC = my_eval.evaluate(results)
print("AUC score is : ",AUC)

('AUC score is : ', 0.7003983987957225)
