In [1]:
#import pyspark
from pyspark import SparkContext
sc = SparkContext()
nums = sc.parallelize([13,2,3,4,5,6,7,8,9])#giving the data# formation of data
nums.take(3)#indexed values are printed 
# index starts from 1 here
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i' %(num))
    

169
4
9
16
25
36
49
64
81


### SQLContext a more convenient way is to use the DataFrame. 
SparkContext is already set, you can use it to create the DataFrame. You also need to declare the SQLContext. SQLContext allows connecting the engine with different data sources. It is used to initiate the functionalities of Spark SQL.


In [2]:
from pyspark.sql import Row#using the SQL because loading a big file SQL helps, CSV is limited to only 10lakhs data row
#over it, CSV doesn't work
from pyspark.sql import SQLContext
from pyspark import SparkContext

In [3]:
sc.stop()
sc = SparkContext()
sqlContext = SQLContext(sc)

In [4]:
list_p = [('jhon',19),('smith',29),('Adam',35),('Henry',50)]
list_p

[('jhon', 19), ('smith', 29), ('Adam', 35), ('Henry', 50)]

### Build a RDD ( Resilient Distributed Dataset )

In [5]:
rdd = sc.parallelize(list_p)

In [6]:
ppl = rdd.map(lambda x: Row(name = x[0], age = int(x[1])))

In [7]:
#create a DataFrame context
#sqlContext.createDataFrame(ppl)
#list_p = [('jhon',19),('smith',29),('Adam',35),('Henry',50)]
#rdd = sc.parallelize(list_p)
DF_ppl = sqlContext.createDataFrame(ppl)

In [8]:
DF_ppl.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
DF_ppl.show()

+---+-----+
|age| name|
+---+-----+
| 19| jhon|
| 29|smith|
| 35| Adam|
| 50|Henry|
+---+-----+



In [10]:
#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles

In [11]:
sc.addFile(url)
sqlContext = SQLContext(sc)

In [12]:
df = sqlContext.read.csv(SparkFiles.get("adult.csv"),header = True, inferSchema = True)

In [13]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [14]:
df.show(2)

+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+
|age|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|
+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+-

In [15]:
#To convert the con
from pyspark.sql.types import *

In [16]:
#write a custom function to convert the data type of DataFrame columns
def convertColumn(df,names,newType):
    for name in names:
        df = df.withColumn(name,df[name].cast(newType))
    return df
#list of continuous features 
CONTI_FEATURES = ['age','fnlwgt','capital_gain','education_num','capital_loss','hours_week']
#convert the types
df = convertColumn(df, CONTI_FEATURES, FloatType())
#check the dataset
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [17]:
df.select('age','fnlwgt','education').show()#top 20 view#it also depends on arguments
#df.select('*').show()

+----+--------+------------+
| age|  fnlwgt|   education|
+----+--------+------------+
|39.0| 77516.0|   Bachelors|
|50.0| 83311.0|   Bachelors|
|38.0|215646.0|     HS-grad|
|53.0|234721.0|        11th|
|28.0|338409.0|   Bachelors|
|37.0|284582.0|     Masters|
|49.0|160187.0|         9th|
|52.0|209642.0|     HS-grad|
|31.0| 45781.0|     Masters|
|42.0|159449.0|   Bachelors|
|37.0|280464.0|Some-college|
|30.0|141297.0|   Bachelors|
|23.0|122272.0|   Bachelors|
|32.0|205019.0|  Assoc-acdm|
|40.0|121772.0|   Assoc-voc|
|34.0|245487.0|     7th-8th|
|25.0|176756.0|     HS-grad|
|32.0|186824.0|     HS-grad|
|38.0| 28887.0|        11th|
|43.0|292175.0|     Masters|
+----+--------+------------+
only showing top 20 rows



In [18]:
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol = "label",outputCol = "newLabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [19]:
df.select('age','fnlwgt','education','relationship','race','sex','native_country').show(5)
#df.select('*').show()

+----+--------+---------+-------------+-----+------+--------------+
| age|  fnlwgt|education| relationship| race|   sex|native_country|
+----+--------+---------+-------------+-----+------+--------------+
|39.0| 77516.0|Bachelors|Not-in-family|White|  Male| United-States|
|50.0| 83311.0|Bachelors|      Husband|White|  Male| United-States|
|38.0|215646.0|  HS-grad|Not-in-family|White|  Male| United-States|
|53.0|234721.0|     11th|      Husband|Black|  Male| United-States|
|28.0|338409.0|Bachelors|         Wife|Black|Female|          Cuba|
+----+--------+---------+-------------+-----+------+--------------+
only showing top 5 rows



In [20]:
df.groupBy("education").count().sort("count",ascending = True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+



In [21]:
#df.describe().show()#give the Standard Deviation #mean # min #max
df.describe('capital_gain').show()#second method of describe
#df.describe()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [22]:
df.crosstab('age','label').sort("age_label").show()

+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|     17.0|  395|   0|
|     18.0|  550|   0|
|     19.0|  710|   2|
|     20.0|  753|   0|
|     21.0|  717|   3|
|     22.0|  752|  13|
|     23.0|  865|  12|
|     24.0|  767|  31|
|     25.0|  788|  53|
|     26.0|  722|  63|
|     27.0|  754|  81|
|     28.0|  748| 119|
|     29.0|  679| 134|
|     30.0|  690| 171|
|     31.0|  705| 183|
|     32.0|  639| 189|
|     33.0|  684| 191|
|     34.0|  643| 243|
|     35.0|  659| 217|
|     36.0|  635| 263|
+---------+-----+----+
only showing top 20 rows



In [23]:
df.drop('education_num').columns#virtual drop #actual can't be done because permission is not 
#data can't be lost

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

In [24]:
df.filter(df.age>40).count()
#number of people greater than age 40

13443

In [25]:
df.groupby('marital').agg({'capital_gain':'mean'}).show()
#groupby can perform aggregate function
#groupBy is different

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+



In [26]:
from pyspark.sql.functions import *
age_square1 = df.select(col("age")**2)#select the column
df = df.withColumn("age_square",col("age")**2)#extra column is created in our data
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [27]:
COLUMNS =['age','age_square','workclass','fnlwgt','education','education_num','marital','occupation','relationship','race','sex','capital_gain','capital_loss','hours_week','native_country','label']
df = df.select(COLUMNS)
#print(df.first())
df.show(2)

+----+----------+----------------+-------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+
| age|age_square|       workclass| fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|
+----+----------+----------------+-------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+
|39.0|    1521.0|       State-gov|77516.0|Bachelors|         13.0|     Never-married|   Adm-clerical|Not-in-family|White|Male|      2174.0|         0.0|      40.0| United-States|<=50K|
|50.0|    2500.0|Self-emp-not-inc|83311.0|Bachelors|         13.0|Married-civ-spouse|Exec-managerial|      Husband|White|Male|         0.0|         0.0|      13.0| United-States|<=50K|
+----+----------+----------------+-------+---------+-------------+---------

In [28]:
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('Native_Country').agg({'native_country':'count'}).sort(asc("count(native_country)")).show()


+--------------------+---------------------+
|      Native_Country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|            Honduras|                   13|
|             Hungary|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|     Trinadad&Tobago|                   19|
|            Cambodia|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [29]:
df_remove = df.filter(df.native_country != 'Holand-Neterlands').show()
#df.filter(df.native_country != 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country':'count'}).sort(asc("count(native_country)")).show()


+----+----------+----------------+--------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+----------+--------------+-----+
| age|age_square|       workclass|  fnlwgt|   education|education_num|             marital|       occupation| relationship|              race|   sex|capital_gain|capital_loss|hours_week|native_country|label|
+----+----------+----------------+--------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+----------+--------------+-----+
|39.0|    1521.0|       State-gov| 77516.0|   Bachelors|         13.0|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|      2174.0|         0.0|      40.0| United-States|<=50K|
|50.0|    2500.0|Self-emp-not-inc| 83311.0|   Bachelors|         13.0|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|         0.0|      