In [0]:
                A)Summary Statistics
                B)Correlations
                C)Stratified Sampling 
                D)Hypothesis Testing
                E)Random Data Generation 

In [0]:
import numpy as np
from pyspark.mllib.stat import Statistics

In [0]:
mat = sc.parallelize([np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])]) 
mat.collect()

Out[2]: [array([  1.,  10., 100.]),
 array([  2.,  20., 200.]),
 array([  3.,  30., 300.])]

In [0]:
summary = Statistics.colStats(mat)
print(summary.mean())  # a dense vector containing the mean value for each column

[  2.  20. 200.]


In [0]:
print(summary.variance())   

[1.e+00 1.e+02 1.e+04]


In [0]:
print(summary.numNonzeros())  

[3. 3. 3.]


In [0]:
#Finding the correlation of data between two series.
from pyspark.mllib.stat import Statistics
seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])  # a series

# seriesY must have the same number of partitions and cardinality as seriesX
seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])

In [0]:
#Entering "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

Correlation is: 0.8500286768773005


In [0]:
data = sc.parallelize(
    [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
)  

In [0]:
print(Statistics.corr(data, method="pearson"))

[[1.         0.97888347 0.99038957]
 [0.97888347 1.         0.99774832]
 [0.99038957 0.99774832 1.        ]]


In [0]:
#it generates randomly sampled data.
#Finding random sample of the given data using stratified sampling
data = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]) # specify the exact fraction desired from each key as a dictionary
fractions = {1: 0.1, 2: 0.6, 3: 0.3}
approxSample = data.sampleByKey(False, fractions)
approxSample.collect()


Out[12]: [(2, 'd'), (2, 'e')]

In [0]:
#examining the differences between categorical variables from a random sample using ChiSquare Test.

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
.appName("ChiSquareTestExample") \
    .getOrCreate()
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])
r = ChiSquareTest.test(df, "features", "label").head()

print("pValues: " + str(r.pValues))


pValues: [0.6872892787909721,0.6822703303362126]


In [0]:
print("degreesOfFreedom: " + str(r.degreesOfFreedom))

degreesOfFreedom: [2, 3]


In [0]:
print("statistics: " + str(r.statistics))

statistics: [0.75,1.5]


In [0]:
#Generating random data using standard normal distribution
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.mllib.random import RandomRDDs

In [0]:
# Generating a random double RDD that contains 1 million i.i.d. values drawn from the
# standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
u = RandomRDDs.normalRDD(sc, 1000000, 10)

In [0]:
v = u.map(lambda x: 1.0 + 2.0 * x)
v.collect()	

Out[18]: [4.127865886958846,
 -2.1945139538340888,
 4.859881639335546,
 4.618276448181602,
 0.8630092882799323,
 -5.339764442259609,
 2.3491934742583807,
 5.212885532742656,
 2.29955033564755,
 -3.6822553545810663,
 1.0716062845053327,
 2.5704890897589303,
 0.10747084498959958,
 0.5476749296761975,
 0.14139395623247375,
 1.0635741301182307,
 1.8090180462546874,
 0.7981757019613238,
 3.089634508856559,
 2.4039468557192882,
 6.584941745164917,
 -1.1909398405657403,
 0.5806656099688132,
 -2.395635411053182,
 -1.2607411315361543,
 5.268266623630114,
 3.3358706808528034,
 -1.8056547944313857,
 -0.5849277250936948,
 0.693292198469195,
 1.0312639749517942,
 2.29041352971166,
 0.504684203153675,
 4.069973821962883,
 -0.3446158295640862,
 0.013450591805651446,
 0.11508316606670099,
 2.350675724067984,
 2.8505572287453624,
 -1.216392448345586,
 3.4190224580806223,
 1.4902796756346641,
 0.7084714767969836,
 5.387266709499492,
 0.3470905608200484,
 2.7714953071836086,
 0.04753408662016234,
 -0.088

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)], 
    ["id", "text", "label"])
df.collect()

Out[19]: [Row(label=0.0, features=DenseVector([0.5, 10.0])),
 Row(label=0.0, features=DenseVector([1.5, 20.0])),
 Row(label=1.0, features=DenseVector([1.5, 30.0])),
 Row(label=0.0, features=DenseVector([3.5, 30.0])),
 Row(label=0.0, features=DenseVector([3.5, 40.0])),
 Row(label=1.0, features=DenseVector([3.5, 40.0]))]

In [0]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [0]:
model = pipeline.fit(training)

In [0]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")], 
    ["id", "text"])

In [0]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.6292098489668488,0.37079015103315116], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566147,0.8658765165743385], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000
