# ApacheSpark

In [3]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="rdd")

22/01/30 12:35:16 WARN Utils: Your hostname, IFernandes resolves to a loopback address: 127.0.1.1; using 192.168.240.163 instead (on interface eth0)
22/01/30 12:35:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/30 12:35:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/30 12:35:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
rdd = sc.parallelize(range(100)) # saved in ApacheSpark memory - best solution
rdd2 = range(100) # saved in local driver - worse solution for parallelization

22/01/29 20:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/01/29 20:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Functional programming
lambda functions, applying a function to a set of data. \
then parallelization takes place by taken sub samples of this list and processing them at different nodes/cores.

In [12]:
# functional programming
rdd = sc.parallelize(range(100))
rdd.map(lambda x: x+1)

PythonRDD[7] at RDD at PythonRDD.scala:53

In [15]:
rdd.take(10), rdd.map(lambda x: x+1).take(10)

([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

In [17]:
sc.parallelize(range(1,101)).reduce(lambda a,b: a+b)

5050

## RDD - Resilient Distributed Dataset and DataFrames

ApacheSparkSQL

In [None]:
# accessing cloudant from ApacheSpark
cloudantdata = sparSession.read.format("com.cloudant.spark").\
option("cloudant.host", ""). \
option("cloudant.username", ""). \
option("cloudant.password", ""). \
load("device_name")

In [None]:
cloudantdata.count()

In [None]:
cloudantdata.createOrReplaceTempView("device_name")
sqlDF = spark.sql("SELECT * FROM device_name")
sqlDF.show()

## Math and Statistics on ApacheSpark

In [127]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
    
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [128]:
import random
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))

In [130]:
meanX = rddX.sum()/float(rddX.count())
meanY = rddY.sum()/float(rddY.count())
print (meanX, rddX.mean())
print (meanY)

49.5 49.5
49.5


In [131]:
# standard deviation
import math
stdd = math.sqrt(rdd.map(lambda x: pow(x-meanX,2)).sum()/n)

In [132]:
# skewness
rdd.map(lambda x: pow(x-meanX,3)/pow(stdd,3)).sum()/n

# kurtosis - shape of the data and outliers content within the data
kurt = rdd.map(lambda x: pow(x-meanX,4)/pow(stdd,4)).sum()/n
kurt

1.7997599759975997

In [135]:
# Covariance and correlation
n = rddXY.count()
## covariance
rddXY = rddX.zip(rddY)
covXY = rddXY.map(lambda x_y : (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/n
covXY

## correlation: corr = covarXY / (stdX * stdY)
from math import sqrt
stdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
stdY = sqrt(rddY.map(lambda x : pow(x-meanY,2)).sum()/n)
print (stdX, stdY)

corrXY = covXY / (stdX*stdY)
corrXY

28.86607004772212 28.86607004772212


0.04537653765376538

In [158]:
## Correlation matrix
column1 = sc.parallelize(range(100))
column2 = sc.parallelize(range(100,200))
column3 = sc.parallelize(list(reversed(range(100))))
import random
column4 = sc.parallelize(random.sample(range(100),100))

data = column1.zip(column2).zip(column3).zip(column4).map(lambda a_b_c_d : (a_b_c_d[0][0][0],a_b_c_d[0][0][1],a_b_c_d[0][1],a_b_c_d[1]).map(lambda a_b_c_d : [a_b_c_d[0],a_b_c_d[1],a_b_c_d[2],a_b_c_d[3]])) 
#data.map(lambda a_b_c_d : [a_b_c_d[0],a_b_c_d[1],a_b_c_d[2],a_b_c_d[3]]).take(10)
from pyspark.mllib.stat import Statistics
print(Statistics.corr(data))


In [125]:
a =[1,2,3,4,5,6,7,8,9,10]
b = [7,6,5,4,5,6,7,8,9,10]
import pandas as pd
di = {"d1":a,"d2":b}
data = pd.DataFrame(di)
data.corr(), data.cov()

(          d1        d2
 d1  1.000000  0.709273
 d2  0.709273  1.000000,
           d1        d2
 d1  9.166667  4.055556
 d2  4.055556  3.566667)

## ApacheSpark run test

In [1]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")

num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

22/01/29 14:05:34 WARN Utils: Your hostname, IFernandes resolves to a loopback address: 127.0.1.1; using 192.168.240.163 instead (on interface eth0)
22/01/29 14:05:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/29 14:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/29 14:05:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

3.14186988
