# Linking with Spark

In [1]:
import os
# https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkConf
# https://spark.apache.org/docs/latest/submitting-applications.html
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *


sc = SparkContext("local[*]", "mty_dse")
#sc = SparkContext('local[*]')
sqlContext = SQLContext(sc)

In [2]:
# do something to prove it works
rdd_samples = sc.parallelize(range(1000))
rdd_samples.takeSample(False, 5)

[224, 288, 46, 178, 551]

### Examples

In [3]:
# Word count
src = "file:////home/jovyan/work/riccardo_nobili_tgaof.txt"
lines = sc.textFile(src)

words = lines.flatMap(lambda x: x.split(" "))
word_count = (
  words.map(lambda x: (x, 1))
            .reduceByKey(lambda x, y: x+y))
word_count.collect()

[('', 9358),
 ('etching,', 1),
 ('reinstated', 1),
 ('reminiscence', 1),
 ('connivance', 1),
 ('Antiques,', 1),
 ('lizard’s', 1),
 ('Just', 4),
 ('spectators.', 1),
 ('afford', 4),
 ('Tarentum,', 1),
 ('bust.', 1),
 ('amateurs.', 6),
 ('rare,', 8),
 ('posture,', 1),
 ('veteris', 1),
 ('φιλόαλος', 1),
 ('spoken', 2),
 ('about,', 1),
 ('501(c)(3)', 1),
 ('eating,', 1),
 ('medallions.', 1),
 ('home,”', 1),
 ('essays', 1),
 ('dressing', 1),
 ('print--the', 1),
 ('rusty', 3),
 ('Warton', 1),
 ('fall', 6),
 ('why', 8),
 ('238;', 1),
 ('pas', 2),
 ('preceding', 5),
 ('format', 4),
 ('Oil', 1),
 ('walnut-juice,', 1),
 ('suggest.', 1),
 ('suggestion.', 2),
 ('remote', 1),
 ('side.', 5),
 ('greenish', 1),
 ('furniture', 29),
 ('Brioschi', 1),
 ('(XXXVII),', 1),
 ('marks--Gold', 1),
 ('able', 33),
 ('instruments--Connoisseurship', 1),
 ('collector--Plagiarians,', 1),
 ('avoided', 1),
 ('attitude.', 1),
 ('class,', 5),
 ('errors', 3),
 ('such,', 7),
 ('scheme,', 1),
 ('talents', 3),
 ('collector--

In [4]:
money_counter = lines.filter(lambda x: "money" in x.lower())
print(money_counter.count())

38


### RDD

In [5]:
'''
Spark has three data representations
viz RDD, Dataframe, Dataset. For each data representation, Spark has a different API.RDD, Dataframe, Dataset.
'''

# Create RDD
data = range(1,1000)
rdd = sc.parallelize(data)

In [6]:
# See the contento of RDD
rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

In [7]:
# Two first elements of rdd
rdd.take(2) 

[1, 2]

In [8]:
mty = ['Monterrey', 'Nuevo Leon', 'Mexico', 'SPGG', 'Guadalupe']
rdd = sc.parallelize(mty)

In [9]:
rdd_mty = rdd.map(lambda x: (x, 1))

In [10]:
rdd_mty.collect()

[('Monterrey', 1),
 ('Nuevo Leon', 1),
 ('Mexico', 1),
 ('SPGG', 1),
 ('Guadalupe', 1)]

### DataFrames

In [11]:
people = sqlContext.read.json("people.json")

In [12]:
people.show()
print("A DataFrame is the structured version of RDD. This is the familiar relational view of the data.")

+--------------------+----------+------+---+---------------+---------+
|               email|first_name|gender| id|     ip_address|last_name|
+--------------------+----------+------+---+---------------+---------+
|mross0@woothemes.com|      Mark|  Male|  1|  193.74.224.53|     Ross|
|pbarnes1@state.tx.us|  Patricia|Female|  2|  118.198.82.58|   Barnes|
|ghenry2@geocities.jp|   Gregory|  Male|  3| 11.202.213.200|    Henry|
|mfernandez3@tmall...|      Mark|  Male|  4|   47.154.217.0|Fernandez|
| jbaker4@auda.org.au|  Jennifer|Female|  5|   39.74.105.41|    Baker|
|agilbert5@reddit.com|       Ann|Female|  6|199.219.100.148|  Gilbert|
|hriley6@yolasite.com|    Howard|  Male|  7|   10.82.75.192|    Riley|
|    sfisher7@bbb.org|    Samuel|  Male|  8|  167.52.36.254|   Fisher|
|jmccoy8@examiner.com|    Jeremy|  Male|  9|  121.43.78.116|    Mccoy|
|     jlopez9@ask.com|      Judy|Female| 10| 148.247.157.84|    Lopez|
|rcollinsa@linkedi...|    Robert|  Male| 11|  211.141.37.30|  Collins|
|cfull

In [13]:
people.printSchema()

root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- last_name: string (nullable = true)



## Machine Learning -

In [14]:
# https://spark.apache.org/docs/1.6.1/ml-classification-regression.html#survival-regression
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.mllib.linalg import Vectors

training = sqlContext.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)


Coefficients: [-0.496311758831,0.198442492722]
Intercept: 2.638095721925244
Scale: 1.5472349038898985
+-----+------+--------------+-----------------+---------------------------------------+
|label|censor|features      |prediction       |quantiles                              |
+-----+------+--------------+-----------------+---------------------------------------+
|1.218|1.0   |[1.56,-0.605] |5.7189868755951  |[1.1603249792259467,4.995462312264076] |
|2.949|0.0   |[0.346,2.158] |18.07646385624652|[3.667532904819837,15.78956131517383]  |
|3.627|0.0   |[1.38,0.231]  |7.381860525439596|[1.4977053361288317,6.447961300031173] |
|0.273|1.0   |[0.52,1.151]  |13.57759376014318|[2.7547573618163135,11.859855494592185]|
|4.199|0.0   |[0.795,-0.226]|9.013107171746727|[1.828668891765906,7.8728345023825055] |
+-----+------+--------------+-----------------+---------------------------------------+



In [15]:
sc.stop()