<h1>Import and install of spark</h1>

In [1]:
#### This cell is to make spark work on a windows laptop
import os
import sys

# Path for spark source folder
os.environ['SPARK_HOME']="C:\spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python")
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python\lib\py4j-0.10.3-src.zip")
#os.environ['SPARK_EXECUTOR_MEMORY']="5G"

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql import SQLContext
    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

# Initialize SparkContext
sc = SparkContext('local')
words = sc.parallelize(["scala","java","hadoop","spark","akka"])
print (words.count())
print(words.countByValue())

Successfully imported Spark Modules
5
defaultdict(<class 'int'>, {'java': 1, 'spark': 1, 'scala': 1, 'hadoop': 1, 'akka': 1})


In [2]:
from pyspark.sql import types
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import numpy as np

<h1>Import data and take off header</h1>

In [3]:
input_path = 'train.csv'
raw_data = sc.textFile(input_path)

print("number of rows before cleaning header:",raw_data.count())

header = raw_data.first()

cleaned_data = raw_data.filter(lambda row : row != header)

print("number of rows without header:",cleaned_data.count())
print('Number of partitions :'+str(cleaned_data.getNumPartitions()))

sqlContext = SQLContext(sc)

number of rows before cleaning header: 188319
number of rows without header: 188318
Number of partitions :2


In [4]:
names = header.split(';')[0].split(',')
print(names)
print("Length of names",len(names))

['id', 'cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7', 'cat8', 'cat9', 'cat10', 'cat11', 'cat12', 'cat13', 'cat14', 'cat15', 'cat16', 'cat17', 'cat18', 'cat19', 'cat20', 'cat21', 'cat22', 'cat23', 'cat24', 'cat25', 'cat26', 'cat27', 'cat28', 'cat29', 'cat30', 'cat31', 'cat32', 'cat33', 'cat34', 'cat35', 'cat36', 'cat37', 'cat38', 'cat39', 'cat40', 'cat41', 'cat42', 'cat43', 'cat44', 'cat45', 'cat46', 'cat47', 'cat48', 'cat49', 'cat50', 'cat51', 'cat52', 'cat53', 'cat54', 'cat55', 'cat56', 'cat57', 'cat58', 'cat59', 'cat60', 'cat61', 'cat62', 'cat63', 'cat64', 'cat65', 'cat66', 'cat67', 'cat68', 'cat69', 'cat70', 'cat71', 'cat72', 'cat73', 'cat74', 'cat75', 'cat76', 'cat77', 'cat78', 'cat79', 'cat80', 'cat81', 'cat82', 'cat83', 'cat84', 'cat85', 'cat86', 'cat87', 'cat88', 'cat89', 'cat90', 'cat91', 'cat92', 'cat93', 'cat94', 'cat95', 'cat96', 'cat97', 'cat98', 'cat99', 'cat100', 'cat101', 'cat102', 'cat103', 'cat104', 'cat105', 'cat106', 'cat107', 'cat108', 'cat109', 'cat110', '

In [5]:
cats = names[1:117]
conts = names[117:-1]

In [6]:
def create_StructField(string):
    hint = string[:3]
    if hint == "cat":
        datatype = types.IntegerType()
    elif hint == "con":
        datatype = types.FloatType()
    elif hint == "id":
        datatype = types.IntegerType()
    elif hint == "los":
        datatype = types.FloatType()
    else:
        raise ValueError("Can\'t read this string:" + hint )

    return types.StructField(string, datatype, False)

In [7]:
structField_list = [create_StructField(string) for string in names]

In [8]:
data_schema = types.StructType(structField_list)

In [9]:
def tryeval(val,column_number):
    if column_number == 0:
        return int(val)
    elif 1 <= column_number <= 116:
        return val
    elif 117 <= column_number <= 131:
        return float(val)
    else:
        raise Exception("There is a big problem")

def to_tuple(row):
    list_strings = row.split(',')
    return tuple(tryeval(val, n) for n, val in enumerate(list_strings))

cleaned_data_splitted = cleaned_data.map(lambda x:to_tuple(x))

    

In [10]:
def to_tuples(list_):
    return tuple((string,) for string in list_)

def fusion(x, y):
    return tuple(tuple(set(xi + yi)) for xi, yi in zip(x,y))

list_of_dictionaries = []
a = cleaned_data_splitted.map(lambda x: to_tuples(x[1:117]))
a = a.reduce(fusion)

In [11]:
sorted_tuples = tuple(tuple(sorted(tup)) for tup in a)

In [12]:
for tup in sorted_tuples:
    my_dict = dict()
    for idx, cat in enumerate(tup):
        my_dict[cat] = idx
    list_of_dictionaries.append(my_dict)

In [13]:
bListOfDictionaries = sc.broadcast(list_of_dictionaries)

In [14]:
def replace(row):
    strings = row[1:117]
    my_dicts = bListOfDictionaries.value
    tuple_of_ints = ()
    for dict_, string in zip(my_dicts, strings):
        try:
            tuple_of_ints += (dict_[string],)
        except KeyError:
            tuple_of_ints += (0.0,)
    return (row[0],) + tuple_of_ints + row[117:]

In [15]:
final_rdd = cleaned_data_splitted.map(lambda row:replace(row))

In [16]:
df = sqlContext.createDataFrame(final_rdd, schema = data_schema).coalesce(12).cache()

In [17]:
def plot_frequencies(column_idx, max_cat = 10):
    name = names[column_idx]
    a = df.groupBy(name).count().collect()
    pdf = pd.DataFrame(data=a).sort_values(0)
    l1 = pdf[0].tolist()[:10]
    l2 = pdf[1].tolist()[:10]
    plt.bar(range(len(l1)), l2)
    plt.xticks(np.arange(len(l1))+0.5,l1)
    plt.title("Frequencies of " + name)
    plt.show()

In [18]:
from pyspark.mllib.stat import KernelDensity

def plotDistribution(rdd, plot=True, numSamples=1000):
    rdd.cache()
    vmin = rdd.min()
    vmax = rdd.max()
    
    if vmin==vmax:
        return None, None
    
    stddev = rdd.stdev()
    
    domain = np.arange(vmin, vmax, (vmax-vmin)/numSamples)
    
    # a simple heuristic to select bandwidth
    bandwidth = 1.06 * stddev * pow(rdd.count(), -.2)
    
    
    kd = KernelDensity()
    kd.setSample(rdd)
    kd.setBandwidth(bandwidth)
    density = kd.estimate(domain)
    
    rdd.unpersist()
    
    # plot
    if(plot):
        plt.plot(domain, density)
        plt.xlim(0,20000)
        plt.show()
    else:
        return domain,density

In [19]:
def densityEstimation(cat_number,max_cat=5):
    
    cat_loss = df.select([cats[cat_number-1],"loss"]).rdd.cache()
    
    for integer in range(len(sorted_tuples[cat_number-1])):
        
        #Selecting the losses belonging to this category
        my_rdd = cat_loss.filter(lambda x: x[0] == integer)
        
        samples = my_rdd.map(lambda x: x[1])
        domain, density = plotDistribution(samples, False)
        
        # This is not done if the category has only one entry.
        if domain is not None and density is not None:
        
            plt.plot(domain,density, label="category: " + str(integer))

            if integer%max_cat == max_cat-1:
                plt.xlim(0,20000)
                plt.legend(loc='upper right')
                plt.title("Categorical feature N° " + str(cat_number))
                plt.show()
            
    if integer%max_cat !=max_cat-1:
        plt.xlim(0,20000)
        plt.legend(loc='upper right')
        plt.title("Categorical feature N° " + str(cat_number))
        plt.show()
    cat_loss.unpersist() 

In [20]:
def sample_and_plot(cont_idx):
    symplified_rdd = df.select([conts[cont_idx],"loss"]).rdd
    sample = symplified_rdd.takeSample(False, 2000)
    continuous_sample = []
    loss_sample = []
    for row in sample:
        continuous_sample.append(row[0])
        loss_sample.append(row[1])
    plt.scatter(continuous_sample, loss_sample, s = 0.07)
    plt.title("Continuous feature N°" + str(cont_idx+1))
    plt.show()

<h1>PART OF MAKING A PREDICTION WITH MultilayerPerceptronClassifier</h1>

In [21]:
from pyspark.ml.linalg import Vectors, VectorUDT

In [57]:
def keep_index(tup):
    result = ()
    for idx in list_indices:
        result += (os.bin(tup[idx-1]),)
        
    return result

In [23]:
df = sqlContext.createDataFrame(final_rdd, schema = data_schema).coalesce(12).cache()

In [24]:
list_indices = [i for i in range(2,15)]
last = len(names)
L = df.rdd.count()
print(last)
print(L)

132
188318


In [25]:
structField_list_selected = [create_StructField(string) for i,string in enumerate(names) if i in list_indices]

In [26]:
mean = df.rdd.map(lambda x: (x[-1])).mean()

In [27]:
df_Perceptron = sqlContext.createDataFrame(df.rdd.map(lambda x: (float(x[-1]-mean), Vectors.dense(keep_index(x)))), ["label", "features"])

In [28]:
df_Perceptron.select("label").show()

+-------------------+
|              label|
+-------------------+
| -824.1577538473625|
|  -1753.73770990205|
| -32.24759759736253|
|  -2097.48770990205|
| -273.4875878317375|
| 2105.5324316995125|
|-1905.1177147848625|
|  548.4123145120125|
|  7242.862509824512|
| 3147.2521582620125|
| 3359.5124121682625|
| 2928.3922949807625|
|-1844.2876366598625|
|-1965.5676659567375|
|-2452.1576928122063|
|-1641.8877343161125|
| 3571.9821387307625|
| -378.6377343161125|
| 1129.9821387307625|
|  760.5522070901375|
+-------------------+
only showing top 20 rows



In [43]:
df_Perceptron.select("features").first()

root
 |-- features: vector (nullable = true)



In [30]:
(df_Perceptron_train,df_Perceptron_test) = df_Perceptron.randomSplit([0.7, 0.3])

In [31]:
df_Perceptron_train.select("label").show()

+-------------------+
|              label|
+-------------------+
|-3028.9376858694573|
|-3027.3376854879875|
|-3016.3476857168694|
|-3001.3376854879875|
| -3000.887684725048|
|-2998.9376839621086|
|-2990.0576867086907|
| -2976.537686250927|
| -2975.857685945751|
|-2975.6476868612785|
|-2971.3676842672844|
|-2970.0176857931633|
|-2960.9876870138664|
|-2957.3376854879875|
|-2957.3376854879875|
|-2957.3376854879875|
|-2957.3376854879875|
|-2953.3376854879875|
| -2949.757683656933|
|-2948.3376854879875|
+-------------------+
only showing top 20 rows



In [58]:
from pyspark.ml.linalg import Vectors,VectorUDT
import pyspark.ml as ml
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

list_indices = [i for i in range(2,20)]
df_Perceptron_tuple = sqlContext.createDataFrame(df.rdd.map(lambda x: (float(x[-1]-mean),Vectors.dense(keep_index(x)))), ["label", "features"])
(df_Perceptron_tuple_train,df_Perceptron_tuple_test) = df_Perceptron_tuple.randomSplit([0.7, 0.3])
df_Perceptron.select("features").printSchema()

layers = [len(list_indices), 5, 4, 5]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(df_Perceptron_tuple_train)
# compute accuracy on the test set
result = model.transform(df_Perceptron_tuple_test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy: " + str(evaluator.evaluate(predictionAndLabels)))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 49.0 failed 1 times, most recent failure: Lost task 0.0 in stage 49.0 (TID 64, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\spark-2.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-58-6dc224e0cd34>", line 8, in <lambda>
  File "<ipython-input-57-d756d58e40fd>", line 4, in keep_index
AttributeError: module 'os' has no attribute 'bin'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor50.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\spark-2.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\spark-2.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-58-6dc224e0cd34>", line 8, in <lambda>
  File "<ipython-input-57-d756d58e40fd>", line 4, in keep_index
AttributeError: module 'os' has no attribute 'bin'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=32).fit(df_Perceptron)

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(df_Perceptron_train)

# Make predictions.
predictions = model.transform(df_Perceptron_test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(100)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

<h1>PART OF MAKING A PREDICTION WITH ?</h1>

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression


glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(df_Perceptron)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load training data
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(df_Perceptron_train)
predictions = model.transform(df_Perceptron_test)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
predictions.select("prediction", "label", "features").show(100)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


In [None]:
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(0.0, Vectors.dense([0.0, 0.0,0.0, 0.0])),
(1.0, Vectors.dense([0.0, 1.0,1.0, 1.0])),
(1.0, Vectors.dense([1.0, 0.0,0.0, 0.0])),
(0.0, Vectors.dense([1.0, 1.0,1.0, 1.0]))], ["label", "features"])
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[4, 2, 4], blockSize=1, seed=123)
model = mlp.fit(df)

testDF = spark.createDataFrame([
(Vectors.dense([1.0, 0.0,0.0, 0.0]),),
(Vectors.dense([1.0, 1.0,1.0, 1.0]),)], ["features"])
model.transform(testDF).show()