In [9]:
from random import randint
list = [randint(10,1000) for x in range(0,20000000)]

In [13]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()


'C:\\BigDataLocalSetup\\spark'

In [14]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

In [17]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [24]:
from random import randint 
import time

# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# create one partition of the list  
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)

# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# >> 1
start_time = time.time()
# filter numbers greater than equal to 200
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# code was run in a jupyter notebook 
# to calculate the time taken to execute the following command

print("--- %s seconds ---" % (time.time() - start_time))

# count the number of elements in filtered list
print(my_large_list_one_partition.count())

1
--- 0.001001596450805664 seconds ---
16162554


In [25]:
from random import randint 
import time

# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# create one partition of the list  
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=5)

# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# >> 1
start_time = time.time()
# filter numbers greater than equal to 200
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# code was run in a jupyter notebook 
# to calculate the time taken to execute the following command

print("--- %s seconds ---" % (time.time() - start_time))

# count the number of elements in filtered list
print(my_large_list_one_partition.count())

5
--- 0.0 seconds ---
16164300


###### From the above we can see that the time taken to count and filter the values less then 200 is 0 when we use 5 slices

we can able to see ther are two types of trasformation in spark
> Narrow Transformation

> Wide Transformation

In Narrow Trasformation all the elements required to compute the single partition lies on that single partition of parent RDD

In Wide Transformation all the elements required to compute the single partition may live more than single partitions  the parent RDD

##### Lazy Evalutation

Spark  has special characteristic called lazy evaluation, where if we need to do a transformation like mapping, filtering, spliting letters on data. it will transform the data bassed on the results that we need thus reducing the unnecessary computaion

Eg:

if we have data of 1GB of 10 partition and we need to transform the data set to lower case. but we want to see the first line of the data alone. In this case spark will read the file from only first partition and perform the trasformation to give out the results  

In [28]:
list_data = [i for i in range(10,200000)]
sp_data = sc.parallelize(list_data,numSlices=5)
sp_data

ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:274

###### Adding 4 to the list

In [30]:
sp_data1 = sp_data.map(lambda x: x+4)
print(sp_data1)
print(sp_data1.toDebugString())
      

PythonRDD[17] at RDD at PythonRDD.scala:53
b'(5) PythonRDD[17] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:274 []'


###### Again adding 20 to the list

In [31]:
sp_data2 = sp_data1.map(lambda x: x+20)
print(sp_data2)
print(sp_data1.toDebugString())

PythonRDD[18] at RDD at PythonRDD.scala:53
b'(5) PythonRDD[17] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:274 []'


Here it will add 24 directly insted of adding 4 and then 20, thus spark can able to define the path by itself easily 

### Spark MLlib and Local Vectors

Spark uses MLlib to perform machine learning algorithams like linear regression, logistic, dimensional reduction and other statistical calculations.


##### Local vectors

MLlib uses two vectors dense and sparse vectors

dense vectors doesnt have any index for the specific values, sparse has index for the values



In [33]:
from pyspark.mllib.linalg import Vectors

print(Vectors.dense([1,2,3,4,5,6,7,8]))

[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]


In [39]:
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))

(10,[0,1,2,4,5],[1.0,5.0,3.0,5.0,7.0])


In [41]:
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())

[1. 5. 3. 0. 5. 7. 0. 0. 0. 0.]


#### Labled Point

In [46]:
from pyspark.mllib.regression import LabeledPoint

point = LabeledPoint(1,Vectors.dense([1,2,3,4,5,6]))
print(point.features)
print(point.label)

[1.0,2.0,3.0,4.0,5.0,6.0]
1.0


##### Local Matrix

Local matrix are stored in single machine it supports bith sparse and dense matrix

In [50]:
from pyspark.mllib.linalg import Matrices

mat1 = Matrices.dense(3,2,[1,2,3,4,5,6])
mat2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])
print(mat2.toArray())
print(mat1.toArray())

[[9. 6. 0.]
 [0. 0. 0.]
 [0. 0. 8.]]
[[1. 4.]
 [2. 5.]
 [3. 6.]]


#### Distributed Matrices

Distributed matrices are stored in one or more RDDs. FOur types of distributed matrices are there

##### Row Matrices

Each row is a local vector, we can store rows in multiple partitions

Algorithams like random forest can be executed by this

In [51]:
from pyspark.mllib.linalg.distributed import RowMatrix

# create RDD
rows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])

# create a distributed Row Matrix
row_matrix = RowMatrix(rows)


print(row_matrix)
# >> <pyspark.mllib.linalg.distributed.RowMatrix at 0x7f425884d7f0> 

print(row_matrix.numRows())
# >> 4

print(row_matrix.numCols())

<pyspark.mllib.linalg.distributed.RowMatrix object at 0x000001D1E89E8C70>
4
3


##### Indexed Row Matrices

Here the rows are stored similar to row matrices in each partiotion but the rows are indexed in an ordered way

In [52]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# create RDD
indexed_rows = sc.parallelize([
    IndexedRow(0, [0,1,2]),
    IndexedRow(1, [1,2,3]),
    IndexedRow(2, [3,4,5]),
    IndexedRow(3, [4,2,3]),
    IndexedRow(4, [2,2,5]),
    IndexedRow(5, [4,5,5])
])

# create IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)

print(indexed_rows_matrix.numRows())
# >> 6

print(indexed_rows_matrix.numCols())

6
3


###### Coordinate Matrices

We use this matrices when dimensions of the matrix are very large

In [53]:

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries with the MatrixEntry class:
matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])

# Create an CoordinateMatrix from an RDD of MatrixEntries.
c_matrix = CoordinateMatrix(matrix_entries)

# number of columns
print(c_matrix.numCols())
# >> 6

# number of rows
print(c_matrix.numRows())

6
2


##### Block Matrix

In this matrices the sub matrices are stored inside  the matrices

we need to specify the dimetion of the matrices along with a inside sub matrix

In [54]:
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
                         ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
                         ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks  of size 3X3
b_matrix = BlockMatrix(blocks, 3, 3) 

# columns per block
print(b_matrix.colsPerBlock)
# >> 3

# rows per block
print(b_matrix.rowsPerBlock)
# >> 3

# convert the block matrix to local matrix
local_mat = b_matrix.toLocalMatrix()

# print local matrix
print(local_mat.toArray())

3
3
[[1. 2. 1. 0. 0. 0.]
 [2. 1. 2. 0. 0. 0.]
 [1. 2. 1. 0. 0. 0.]
 [0. 0. 0. 3. 3. 3.]
 [0. 0. 0. 4. 4. 4.]
 [0. 0. 0. 5. 5. 5.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]]
