<a href="https://colab.research.google.com/github/pratikesh3232/Pyspark_intro/blob/main/pyspark_MlLib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [None]:
spk  = SparkSession.builder.appName('test').getOrCreate()

In [None]:
df = spk.read.csv('/content/sample_data/Big Sales Data.csv', header=True, inferSchema=True)

In [None]:
df.show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDT36|       12.3|         Low Fat|    0.111447593|Baking Goods| 33.4874|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|      436.6087212|
|          FDT36|       12.3|         Low Fat|    0.111904005|Baking Goods| 33.9874|           OUT017|                     2007|     Medium|              Tier 2|Supermarket Type1|      443.1277212|
|         

In [None]:
df.columns

['Item_Identifier',
 'Item_Weight',
 'Item_Fat_Content',
 'Item_Visibility',
 'Item_Type',
 'Item_MRP',
 'Outlet_Identifier',
 'Outlet_Establishment_Year',
 'Outlet_Size',
 'Outlet_Location_Type',
 'Outlet_Type',
 'Item_Outlet_Sales']

# Selecting and Accessing Data¶

In [None]:
df.Outlet_Type

Column<'Outlet_Type'>

In [None]:
df.select(df.Outlet_Type).show()

+-----------------+
|      Outlet_Type|
+-----------------+
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type2|
|    Grocery Store|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type3|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type2|
|Supermarket Type2|
|Supermarket Type3|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
+-----------------+
only showing top 20 rows



In [None]:
df.select("Outlet_Type").show()

+-----------------+
|      Outlet_Type|
+-----------------+
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type2|
|    Grocery Store|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type3|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type2|
|Supermarket Type2|
|Supermarket Type3|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
|Supermarket Type1|
+-----------------+
only showing top 20 rows



## Assign new Column instance.

In [None]:
df.withColumn("Add_new_col",df.Item_Weight*2).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Add_new_col|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+
|          FDT36|       12.3|         Low Fat|    0.111447593|Baking Goods| 33.4874|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|      436.6087212|       24.6|
|          FDT36|       12.3|         Low Fat|    0.111904005|Baking Goods| 33.9874|           OUT017|                     2007|     Medium|              Tier 2

## To select a subset of rows, use DataFrame.filter()

In [None]:
df.filter((df.Item_Weight>1.0) & (df.Item_Fat_Content == "Low Fat")).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDT36|       12.3|         Low Fat|    0.111447593|Baking Goods| 33.4874|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|      436.6087212|
|          FDT36|       12.3|         Low Fat|    0.111904005|Baking Goods| 33.9874|           OUT017|                     2007|     Medium|              Tier 2|Supermarket Type1|      443.1277212|
|         

# Grouping Data

**PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.**

In [None]:
df.groupBy("Item_Fat_Content").count().show()

+----------------+-----+
|Item_Fat_Content|count|
+----------------+-----+
|         low fat|  178|
|         Low Fat| 8485|
|              LF|  522|
|         Regular| 4824|
|             reg|  195|
+----------------+-----+



In [None]:
df.groupBy("Item_Fat_Content").avg().show()

+----------------+------------------+--------------------+------------------+------------------------------+----------------------+
|Item_Fat_Content|  avg(Item_Weight)|avg(Item_Visibility)|     avg(Item_MRP)|avg(Outlet_Establishment_Year)|avg(Item_Outlet_Sales)|
+----------------+------------------+--------------------+------------------+------------------------------+----------------------+
|         low fat|12.161865671641788| 0.06726006615168541|137.12802359550557|            1996.0393258426966|    2198.8726229106746|
|         Low Fat|12.931448875689632| 0.06398324064124945|141.45435670005895|            1997.8800235710078|     2198.114992464652|
|              LF|12.697645011600935|  0.0678887477490421|138.26792490421445|            1997.6245210727968|     2049.586111552299|
|         Regular|12.574764162715026| 0.06905577966314269|140.84748843283552|             1997.810945273632|    2182.5446078800956|
|             reg|12.597034883720925| 0.06851380582051281|136.21308307692308

# Getting Data In/Out|

In [None]:
df.write.csv('/usr/Big Sales Data3.csv', header=True)
spk.read.csv('/content/sample_data/Big Sales Data.csv', header=True).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDT36|       12.3|         Low Fat|    0.111447593|Baking Goods| 33.4874|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|      436.6087212|
|          FDT36|       12.3|         Low Fat|    0.111904005|Baking Goods| 33.9874|           OUT017|                     2007|     Medium|              Tier 2|Supermarket Type1|      443.1277212|
|         

# API Reference

**https://spark.apache.org/docs/latest/api/python/reference/index.html**

## Functions

In [None]:
from pyspark.sql.functions import slice as pyspark_slice

df1 = spk.createDataFrame([([1, 2, 3],), ([4, 5,4,5,6],)], ['x'])

# Use pyspark.sql.functions.slice to select elements from the array column
df1.select(pyspark_slice(df1.x, 2, 3)).collect() # Slice from index 2 with length 3

[Row(slice(x, 2, 3)=[2, 3]), Row(slice(x, 2, 3)=[5, 4, 5])]

In [None]:
df1.show()

+---------------+
|              x|
+---------------+
|      [1, 2, 3]|
|[4, 5, 4, 5, 6]|
+---------------+



In [None]:
from pyspark.sql.functions import concat
df = spk.createDataFrame([('abcd','123')], ['s', 'd'])
df.show()
df = df.select(concat(df.s, df.d).alias('s')).collect()


+----+---+
|   s|  d|
+----+---+
|abcd|123|
+----+---+



In [None]:
df

[Row(s='abcd123')]

# Basic Statistics

## Correlation

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spk.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()


print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()


print("Spearman correlation matrix:\n" + str(r2[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


# ML lib (https://spark.apache.org/docs/latest/ml-features.html#tf-idf)

# Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec

In [None]:
#Input data: Each row is a bag of words from a sentence or document.
documentDF = spk.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

In [None]:
documentDF.show()

+--------------------+
|                text|
+--------------------+
|[Hi, I, heard, ab...|
|[I, wish, Java, c...|
|[Logistic, regres...|
+--------------------+



In [None]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

In [None]:
result = model.transform(documentDF)

In [None]:
result.show()

+--------------------+--------------------+
|                text|              result|
+--------------------+--------------------+
|[Hi, I, heard, ab...|[0.02088536694645...|
|[I, wish, Java, c...|[-0.0270776738678...|
|[Logistic, regres...|[0.03027330562472...|
+--------------------+--------------------+



In [None]:
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.020885366946458817,-0.05819434002187336,-0.03002806007862091]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.027077673867877038,-0.07107515899198395,0.0086154043674469]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.030273305624723437,0.046297441329807044,0.1026412695646286]



## Tokenizer

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
sentenceDataFrame = spk.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

In [None]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenized = tokenizer.transform(sentenceDataFrame)

In [None]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [None]:
for row in tokenized.collect():
    id, text, words = row # Unpack the row into three variables: id, text, and words
    print("ID: [%s], Text: [%s] => \nwords: [%s]\n" % (id, text, str(words))) # Print the ID, Text, and words

ID: [0], Text: [Hi I heard about Spark] => 
words: [['hi', 'i', 'heard', 'about', 'spark']]

ID: [1], Text: [I wish Java could use case classes] => 
words: [['i', 'wish', 'java', 'could', 'use', 'case', 'classes']]

ID: [2], Text: [Logistic,regression,models,are,neat] => 
words: [['logistic,regression,models,are,neat']]



## StopWordsRemover

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
stop_word = StopWordsRemover(inputCol="words", outputCol="filtered")
df = stop_word.transform(tokenized)

In [None]:
df.show()

+---+--------------------+--------------------+--------------------+
| id|            sentence|               words|            filtered|
+---+--------------------+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|  [hi, heard, spark]|
|  1|I wish Java could...|[i, wish, java, c...|[wish, java, use,...|
|  2|Logistic,regressi...|[logistic,regress...|[logistic,regress...|
+---+--------------------+--------------------+--------------------+



In [None]:
for row in df.collect():
    id, text, words, filter = row # Unpack the row into three variables: id, text, and words
    print("ID: [%s], Text: [%s] => \nwords: [%s]\nFiltered: [%s]" % (id, text, str(words),str(filter)))
    print("============================")

ID: [0], Text: [Hi I heard about Spark] => 
words: [['hi', 'i', 'heard', 'about', 'spark']]
Filtered: [['hi', 'heard', 'spark']]
ID: [1], Text: [I wish Java could use case classes] => 
words: [['i', 'wish', 'java', 'could', 'use', 'case', 'classes']]
Filtered: [['wish', 'java', 'use', 'case', 'classes']]
ID: [2], Text: [Logistic,regression,models,are,neat] => 
words: [['logistic,regression,models,are,neat']]
Filtered: [['logistic,regression,models,are,neat']]


# Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec

In [None]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="result")
model = word2Vec.fit(df)

In [None]:
df = model.transform(df)

In [None]:
df.show()

+---+--------------------+--------------------+--------------------+--------------------+
| id|            sentence|               words|            filtered|              result|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|  [hi, heard, spark]|[-0.0205548585702...|
|  1|I wish Java could...|[i, wish, java, c...|[wish, java, use,...|[0.00869127139449...|
|  2|Logistic,regressi...|[logistic,regress...|[logistic,regress...|[-0.0058325133286...|
+---+--------------------+--------------------+--------------------+--------------------+



In [None]:
for row in df.collect():
    id,sentence,words,filtered,result = row
    print("ID: [%s], Text: [%s] => \nwords: [%s]\nFiltered: [%s]\nresult: [%s]" % (id, sentence, str(words),str(filtered),result))
    print("============================")

ID: [0], Text: [Hi I heard about Spark] => 
words: [['hi', 'i', 'heard', 'about', 'spark']]
Filtered: [['hi', 'heard', 'spark']]
result: [[-0.020554858570297558,0.0015943112472693124,0.09813929721713066]]
ID: [1], Text: [I wish Java could use case classes] => 
words: [['i', 'wish', 'java', 'could', 'use', 'case', 'classes']]
Filtered: [['wish', 'java', 'use', 'case', 'classes']]
result: [[0.008691271394491195,-0.0520847249776125,0.016328829526901244]]
ID: [2], Text: [Logistic,regression,models,are,neat] => 
words: [['logistic,regression,models,are,neat']]
Filtered: [['logistic,regression,models,are,neat']]
result: [[-0.005832513328641653,-0.034841299057006836,0.03432728722691536]]


## n-gram

In [None]:
from pyspark.ml.feature import NGram

In [None]:
wordDataFrame = spk.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

In [None]:
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.show()

+---+--------------------+--------------------+
| id|               words|              ngrams|
+---+--------------------+--------------------+
|  0|[Hi, I, heard, ab...|[Hi I, I heard, h...|
|  1|[I, wish, Java, c...|[I wish, wish Jav...|
|  2|[Logistic, regres...|[Logistic regress...|
+---+--------------------+--------------------+



In [86]:
for row in ngramDataFrame.collect():
    id,words,ngrams = row
    print("ID: [%s], Text: [%s] => \nngrams: [%s] => \n" % (id,str(words),str(ngrams)))
    print("============================")

ID: [0], Text: [['Hi', 'I', 'heard', 'about', 'Spark']] => 
ngrams: [['Hi I', 'I heard', 'heard about', 'about Spark']] => 

ID: [1], Text: [['I', 'wish', 'Java', 'could', 'use', 'case', 'classes']] => 
ngrams: [['I wish', 'wish Java', 'Java could', 'could use', 'use case', 'case classes']] => 

ID: [2], Text: [['Logistic', 'regression', 'models', 'are', 'neat']] => 
ngrams: [['Logistic regression', 'regression models', 'models are', 'are neat']] => 



## oneHotEncoder

In [88]:
from pyspark.ml.feature import OneHotEncoder

df = spk.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+



## MinMaxScaler

In [90]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spk.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Features scaled to range: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]|     (3,[],[])|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



## Imputer

In [91]:
from pyspark.ml.feature import Imputer

df = spk.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+

