In [1]:
import findspark
findspark.init()

# RDD
RDD (Resilient Distributed Database) is a collection of elements, that can be divided across multiple nodes in a cluster to run parallel processing. It is also fault tolerant collection of elements, which means it can automatically recover from failures. RDD is immutable, we can create RDD once but can’t change it. We can apply any number of operation on it and can create another RDD by applying some transformations. Here are a few things to keep in mind about RDD:

We can apply 2 types of operations on RDDs:

**Transformation**: Transformation refers to the operation applied on a RDD to create new RDD. 
**Action**: Actions refer to an operation which also apply on RDD that perform computation and send the result back to driver.

*Example*: Map (Transformation) performs operation on each element of RDD and returns a new RDD. But, in case of Reduce (Action), it reduces / aggregates the output of a map by applying some functions (Reduce by key). There are many transformations and actions are defined in Apache Spark documentation, I will discuss these in a later article.

 

RDDs use **Shared Variable**: 
The parallel operations in Apache Spark use shared variable. It means that whenever a task is sent by a driver to executors program in a cluster, a copy of shared variable is sent to each node in a cluster, so that they can use this variable while performing task. Accumulator and Broadcast are the two types of shared variables supported by Apache Spark.  
**Broadcast**: We can use the Broadcast variable to save the copy of data across all node.  
**Accumulator**: In Accumulator variables are used for aggregating the information.

In [16]:
import pandas as pd
from pyspark import SparkContext, SparkConf, SQLContext
sc = SparkContext()

In [3]:
data = range(1,1000)
rdd = sc.parallelize(data)
rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

In [4]:
rdd.take(2) # it prints first two elements of rdd

[1, 2]

## Map transformation.
Map transformation returns a Mapped RDD by applying function to each element of the base RDD

In [9]:
data = ['Hello','I','AM','Anki','Gupta']
Rdd = sc.parallelize(data)

Rdd1 = Rdd.map(lambda x:(x,1))
Rdd1.collect()

[('Hello', 1), ('I', 1), ('AM', 1), ('Anki', 1), ('Gupta', 1)]

All transformation operations in Spark are lazy, which means that we will not see any computations on RDD, until we need them for further action.

Spark remembers which transformation is applied to which RDD with the help of DAG (Directed a Cyclic Graph). The lazy evaluation helps Spark to optimize the solution because Spark will get time to see the DAG before actually executing the operations on RDD. This enables Spark to run operations more efficiently.

## Solving a machine learning problem

### Reading a data file (csv)
For reading the csv file, first we need to download Spark-csv package (Latest) and extract this package into the home directory of Spark. Then, we need to open a PySpark shell and include the package
```
pyspark --packages com.databricks:spark-csv_2.10:1.3.0
```

Dataframe is a distributed collection of observations (rows) with column name, just like a table.

In [17]:
# if not using pyspark shell, may need load sparkcontext first
# sc = sparkContext()
sqlContext = SQLContext(sc)

In [19]:
dir(sqlContext)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_inferSchema',
 '_instantiatedContext',
 '_jsc',
 '_jsqlContext',
 '_jvm',
 '_sc',
 '_ssql_ctx',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'dropTempTable',
 'getConf',
 'getOrCreate',
 'newSession',
 'range',
 'read',
 'readStream',
 'registerDataFrameAsTable',
 'registerFunction',
 'registerJavaFunction',
 'setConf',
 'sparkSession',
 'sql',
 'streams',
 'table',
 'tableNames',
 'tables',
 'udf',
 'uncacheTable']

In [18]:
train = sqlContext.load(source='com.databricks.spark.csv',path='train.csv',
                        header=True,inferSchema=True)
test = sqlContext.load(source='com.databricks.spark.csv',path='test-comb.csv',
                        header=True,inferSchema=True)

AttributeError: 'SQLContext' object has no attribute 'load'

PATH is the location of folder, where your train and test csv files are located. Header is True, it means that the csv files contains the header. We are using inferSchema is True for telling sqlContext to automatically detect the data type of each column in data frame. If we do not set inferSchema to true, all columns will be read as string.



### Analyze the data type
To see the types of columns in Dataframe, we can use the method printSchema().

In [None]:
train.printSchema()

### Previewing the data set
To see the first n rows of a Dataframe, we have head() method in PySpark, just like pandas in python. To see the number of rows in a data frame we need to call a method count(). The count method in pandas and Spark are different.

In [None]:
train.head(10)

In [None]:
train.count()

### Impute Missing values
We can check number of not null observations in train and test by calling drop() method. By default, drop() method will drop a row if it contains any null value. We can also pass ‘all” to drop a row only if all its values are null.

In [None]:
train.na.drop().count(), test.na.drop().count()

In [None]:
train = train.fillna(-1)
test = test.fillna(-1)

### Analyze numerical features

In [None]:
train.describe().show()

### Sub-setting Columns

In [None]:
train.select('User_ID').show()

### Analyze categorical features

In [None]:
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()

In [None]:
diff_cat_in_train_test = test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()

### Transforming categorical variables to labels

In [None]:
from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol='Product_ID',outputCol='product_ID')
labeller = plan_indexer.fit(train)

In [None]:
Train1 = labeller.transform(train)
Test1 = labeller.transform(test)

In [None]:
Train1.show()

### Selecting features

In [None]:
from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")


In [None]:
t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
test1 = t1.transform(Test1)

In [None]:
train1.show()

After applying the formula we can see that train1 and test1 have 2 extra columns called features and label those we have specified in the formula (featuresCol=”features” and labelCol=”label”). The intuition is that all categorical variables in the features column in train1 and test1 are transformed to the numerical and the numerical variables are same as before for applying ML. Purchase variable will transom to label column. We can also look at the column features and label in train1 and test1.

In [None]:
train1.select('features').show(),train1.select('label').show()

### Building a machine learning model: Random Forest

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()
train_cv,test_cv = train1.randomSplit([0.7,0.3])
model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:'mse'})
import numpy as np
np.sqrt(mse),mse

In [None]:
model = rf.fit(train1)
predictions1 = model.transform(test1)
df = predictions1.selectExpr('User_ID as User_ID','Product_ID as Product_ID',
                            'prediction as Purchase')
df.toPandas().to_csv('submission.csv')