## Set up and loading data

In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.mllib.feature import StandardScaler

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

config = SparkConf().setAppName("SparkAssignment").setMaster("local")
sc = SparkContext(conf = config)
sqlContext = SQLContext(sc)

In [22]:
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [65]:
df=spark.read.csv('sample_data.csv',inferSchema=True,header=True)

In [66]:
df.columns

['ratings', 'age', 'experience', 'family', 'mobile']

In [67]:
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: integer (nullable = true)
 |-- mobile: string (nullable = true)



In [68]:
df.show(5)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
+-------+---+----------+------+-------+
only showing top 5 rows



In [69]:
df.select('mobile').distinct().show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [70]:
df_new=df.drop('mobile')

In [71]:
df_new.show(10)

+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      3| 32|       9.0|     3|
|      3| 27|      13.0|     3|
|      4| 22|       2.5|     0|
|      4| 37|      16.5|     4|
|      5| 27|       9.0|     1|
|      4| 27|       9.0|     0|
|      5| 37|      23.0|     5|
|      5| 37|      23.0|     5|
|      3| 22|       2.5|     0|
|      3| 27|       6.0|     0|
+-------+---+----------+------+
only showing top 10 rows



In [72]:
#df_new = sqlContext.createDataFrame(df_new)
from pyspark.mllib.linalg import Vectors
scaler = StandardScaler( withStd=True, withMean=False)
rdd = df_new.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
scalerModel = scaler.fit(rdd)
scaledData = scalerModel.transform(rdd)

In [73]:
from pyspark.mllib.linalg.distributed import RowMatrix
mat = RowMatrix(scaledData)

## PCA

In [74]:
pc = mat.computePrincipalComponents(2)

In [75]:
projected = mat.multiply(pc)

In [77]:
mat.rows.collect()

[DenseVector([2.6813, 5.1736, 1.3293, 1.6262]),
 DenseVector([2.6813, 4.3652, 1.92, 1.6262]),
 DenseVector([3.575, 3.5568, 0.3692, 0.0]),
 DenseVector([3.575, 5.982, 2.437, 2.1682]),
 DenseVector([4.4688, 4.3652, 1.3293, 0.5421]),
 DenseVector([3.575, 4.3652, 1.3293, 0.0]),
 DenseVector([4.4688, 5.982, 3.397, 2.7103]),
 DenseVector([4.4688, 5.982, 3.397, 2.7103]),
 DenseVector([2.6813, 3.5568, 0.3692, 0.0]),
 DenseVector([2.6813, 4.3652, 0.8862, 0.0]),
 DenseVector([1.7875, 4.3652, 0.8862, 1.0841]),
 DenseVector([4.4688, 4.3652, 0.8862, 1.0841]),
 DenseVector([2.6813, 5.982, 2.437, 2.7103]),
 DenseVector([4.4688, 4.3652, 0.8862, 0.0]),
 DenseVector([3.575, 3.5568, 0.8862, 0.5421]),
 DenseVector([3.575, 5.982, 1.3293, 1.0841]),
 DenseVector([3.575, 4.3652, 0.8862, 0.5421]),
 DenseVector([0.8938, 5.982, 3.397, 2.7103]),
 DenseVector([1.7875, 6.7903, 3.397, 1.0841]),
 DenseVector([3.575, 5.982, 0.8862, 0.0]),
 DenseVector([4.4688, 3.5568, 0.3692, 0.0]),
 DenseVector([2.6813, 5.982, 2.437,

In [78]:
projected.rows.collect()

[DenseVector([3.8573, -4.0192]),
 DenseVector([3.7754, -3.8952]),
 DenseVector([1.3141, -4.3172]),
 DenseVector([5.0474, -5.2179]),
 DenseVector([2.4167, -5.4998]),
 DenseVector([2.3153, -4.6089]),
 DenseVector([5.7178, -6.2126]),
 DenseVector([5.7178, -6.2126]),
 DenseVector([1.5156, -3.4545]),
 DenseVector([2.2541, -3.6983]),
 DenseVector([3.0612, -2.8924]),
 DenseVector([2.4568, -5.4803]),
 DenseVector([5.5516, -4.3836]),
 DenseVector([1.8512, -5.4236]),
 DenseVector([1.9234, -4.4014]),
 DenseVector([3.7851, -5.0414]),
 DenseVector([2.3555, -4.5893]),
 DenseVector([6.5237, -2.7622]),
 DenseVector([5.8459, -3.7277]),
 DenseVector([2.9168, -4.9368]),
 DenseVector([1.1127, -5.1798]),
 DenseVector([5.5516, -4.3836]),
 DenseVector([6.5528, -4.6753]),
 DenseVector([3.3239, -2.9403]),
 DenseVector([2.3555, -4.5893]),
 DenseVector([1.5447, -5.3677]),
 DenseVector([3.0612, -2.8924]),
 DenseVector([3.631, -5.9395]),
 DenseVector([4.4126, -3.248]),
 DenseVector([2.2541, -3.6983]),
 DenseVector

## SVD

In [79]:
svd = mat.computeSVD(2, computeU=True)
U = svd.U       # The U factor is a RowMatrix.
s = svd.s       # The singular values are stored in a local dense vector.
V = svd.V       # The V factor is a local dense matrix.

In [80]:
U.rows.collect()

[DenseVector([-0.171, -0.061]),
 DenseVector([-0.1573, -0.0823]),
 DenseVector([-0.1327, 0.2082]),
 DenseVector([-0.2124, -0.1051]),
 DenseVector([-0.1731, 0.1732]),
 DenseVector([-0.1578, 0.134]),
 DenseVector([-0.2347, -0.1251]),
 DenseVector([-0.2347, -0.1251]),
 DenseVector([-0.12, 0.1338]),
 DenseVector([-0.1418, 0.0869]),
 DenseVector([-0.1344, -0.0578]),
 DenseVector([-0.1725, 0.1652]),
 DenseVector([-0.2023, -0.2147]),
 DenseVector([-0.1672, 0.2356]),
 DenseVector([-0.1391, 0.1412]),
 DenseVector([-0.1991, 0.0334]),
 DenseVector([-0.1571, 0.1261]),
 DenseVector([-0.184, -0.4224]),
 DenseVector([-0.2068, -0.2576]),
 DenseVector([-0.1906, 0.1311]),
 DenseVector([-0.1454, 0.2825]),
 DenseVector([-0.2023, -0.2147]),
 DenseVector([-0.2274, -0.2888]),
 DenseVector([-0.1376, -0.0851]),
 DenseVector([-0.1571, 0.1261]),
 DenseVector([-0.1634, 0.2674]),
 DenseVector([-0.1344, -0.0578]),
 DenseVector([-0.2134, 0.1066]),
 DenseVector([-0.1637, -0.1683]),
 DenseVector([-0.1418, 0.0869]),
 D

In [81]:
s.values

array([35.91681067,  8.09251578])

In [82]:
V.values

array([-0.50961817, -0.8008306 , -0.26199377, -0.17412331,  0.67311762,
       -0.15120319, -0.49786969, -0.52552454])

## Clustering

In [83]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [84]:
dataset = spark.read.format("libsvm").load("kmeans_data.txt")

In [85]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [86]:
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [87]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
%matplotlib inline

In [88]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[0.1 0.1 0.1]
[9.1 9.1 9.1]


In [90]:
predictions = model.transform(dataset)
rows = predictions.collect()
print(rows[:3])

[Row(label=0.0, features=SparseVector(3, {}), prediction=0), Row(label=1.0, features=SparseVector(3, {0: 0.1, 1: 0.1, 2: 0.1}), prediction=0), Row(label=2.0, features=SparseVector(3, {0: 0.2, 1: 0.2, 2: 0.2}), prediction=0)]


In [91]:
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


## fpgrowth and prefix span

### method #1

In [4]:
from pyspark.mllib.fpm import FPGrowth

In [5]:
data = sc.textFile("fpgrowth_data.txt")


In [7]:
transactions = data.map(lambda line: line.strip().split(' '))
transactions.collect()

[['r', 'z', 'h', 'k', 'p'],
 ['z', 'y', 'x', 'w', 'v', 'u', 't', 's'],
 ['s', 'x', 'o', 'n', 'r'],
 ['x', 'z', 'y', 'm', 't', 's', 'q', 'e'],
 ['z'],
 ['x', 'z', 'y', 'r', 'q', 't', 'p']]

In [8]:
#model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
model = FPGrowth.train(data=transactions, minSupport=0.2, numPartitions= 10)

In [9]:
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['z'], freq=5)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['y'], freq=3)
FreqItemset(items=['y', 'x'], freq=3)
FreqItemset(items=['y', 'x', 'z'], freq=3)
FreqItemset(items=['y', 'z'], freq=3)
FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'y'], freq=3)
FreqItemset(items=['t', 'y', 'x'], freq=3)
FreqItemset(items=['t', 'y', 'x', 'z'], freq=3)
FreqItemset(items=['t', 'y', 'z'], freq=3)
FreqItemset(items=['t', 's'], freq=2)
FreqItemset(items=['t', 's', 'y'], freq=2)
FreqItemset(items=['t', '

### method #2

In [25]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import split

In [27]:
data = (spark.read.text("fpgrowth_data.txt").select(split("value","\s+").alias("items")))
#.select(split("value", "\s+").alias("items")))
data.show(truncate=False)

fp = FPGrowth(minSupport=0.3, minConfidence=0.9)
fpm = fp.fit(data)
fpm.freqItemsets.show(5)
fpm.associationRules.show(5)

+------------------------+
|items                   |
+------------------------+
|[r, z, h, k, p]         |
|[z, y, x, w, v, u, t, s]|
|[s, x, o, n, r]         |
|[x, z, y, m, t, s, q, e]|
|[z]                     |
|[x, z, y, r, q, t, p]   |
+------------------------+

+---------+----+
|    items|freq|
+---------+----+
|      [s]|   3|
|   [s, x]|   3|
|[s, x, z]|   2|
|   [s, z]|   2|
|      [r]|   3|
+---------+----+
only showing top 5 rows

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
|    [t, s]|       [y]|       1.0|
|    [t, s]|       [x]|       1.0|
|    [t, s]|       [z]|       1.0|
|       [p]|       [r]|       1.0|
|       [p]|       [z]|       1.0|
+----------+----------+----------+
only showing top 5 rows



## Prefix Span

In [29]:
from pyspark.mllib.fpm import PrefixSpan
from pyspark.sql import Row

In [41]:
data = [  [["a", "b"], ["c"]],
      [["a"], ["c", "b"], ["a", "b"]],
      [["a", "b"], ["e"]],
      [["f"]]]
rdd = sc.parallelize(data, 2)

model = PrefixSpan.train(rdd)
result = model.freqSequences().collect()
for fs in sorted(result):
    print('{}, {}'.format(fs.sequence,fs.freq))

[['a']], 3
[['a'], ['a']], 1
[['a'], ['b']], 1
[['a'], ['b'], ['a']], 1
[['a'], ['b'], ['b']], 1
[['a'], ['b'], ['b', 'a']], 1
[['a'], ['b', 'a']], 1
[['a'], ['b', 'c']], 1
[['a'], ['b', 'c'], ['a']], 1
[['a'], ['b', 'c'], ['b']], 1
[['a'], ['b', 'c'], ['b', 'a']], 1
[['a'], ['c']], 2
[['a'], ['c'], ['a']], 1
[['a'], ['c'], ['b']], 1
[['a'], ['c'], ['b', 'a']], 1
[['a'], ['e']], 1
[['b']], 3
[['b'], ['a']], 1
[['b'], ['b']], 1
[['b'], ['b', 'a']], 1
[['b'], ['c']], 1
[['b'], ['e']], 1
[['b', 'a']], 3
[['b', 'a'], ['c']], 1
[['b', 'a'], ['e']], 1
[['b', 'c']], 1
[['b', 'c'], ['a']], 1
[['b', 'c'], ['b']], 1
[['b', 'c'], ['b', 'a']], 1
[['c']], 2
[['c'], ['a']], 1
[['c'], ['b']], 1
[['c'], ['b', 'a']], 1
[['e']], 1
[['f']], 1


## DecisionTree Classification

In [42]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [43]:
data = spark.read.format("libsvm").load("classification_decisiontree.txt")

In [44]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

In [54]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2).fit(data)


In [55]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [56]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

In [57]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [58]:
model = pipeline.fit(trainingData)

In [59]:
predictions = model.transform(testData)

In [60]:
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(4,[0,1,2,3],[4.4...|
|       1.0|         1.0|(4,[0,1,2,3],[4.5...|
|       1.0|         1.0|(4,[0,1,2,3],[4.6...|
|       1.0|         1.0|(4,[0,1,2,3],[4.8...|
|       1.0|         1.0|(4,[0,1,2,3],[4.8...|
+----------+------------+--------------------+
only showing top 5 rows



In [61]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0444444 


In [62]:
treeModel = model.stages[2]
print(treeModel)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_45608310569fafef850e) of depth 5 with 15 nodes
