<a href="https://colab.research.google.com/github/omchaudhary07/Advertising-OLS-library-python-/blob/master/spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [14]:
#!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

In [15]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz


In [16]:
!pip install -q findspark

In [19]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [20]:
import findspark
findspark.init()

In [21]:
findspark.find()


'/content/spark-3.0.0-bin-hadoop3.2'

In [22]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [23]:
spark


In [24]:
df = spark.read.csv("train.csv", header=True, inferSchema=True)


In [27]:
type(df)

pyspark.sql.dataframe.DataFrame

In [29]:
df.show(5) # like df.head() of pandas.

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
|100

In [31]:
df.count()

203282

In [32]:
print((df.count(), len(df.columns))) # works like df.shape of pandas

(203282, 12)


In [34]:
# Display specific columns

In [40]:
df.select("User_ID","Gender","Age","Occupation").show(5) # Selecting specific columns

+-------+------+----+----------+
|User_ID|Gender| Age|Occupation|
+-------+------+----+----------+
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000002|     M| 55+|        16|
+-------+------+----+----------+
only showing top 5 rows



In [41]:
df.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            203282|    203282|203282|203282|           203282|       203282|                    203282|             203282|            203282|            140237|             62257|           203282|
|   mean|1002927.6097096644|      null|  null|  null| 8.07452701173739|         null|        1.4692908891839636|0.40998219222557825| 5.292259029328716| 9.84814278685368

In [44]:
#Distinct values for Categorical columns. 
# To get unique values from a particular column

df.select("City_Category").distinct().show()

+-------------+
|City_Category|
+-------------+
|            B|
|            C|
|            A|
+-------------+



In [47]:
from pyspark.sql import functions as F
# Groupby to get total sum as per the city categories.
df.groupBy("City_Category").agg(F.sum("Purchase")).show()

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|    782323106|
|            C|    617793171|
|            A|    494135032|
+-------------+-------------+



In [48]:
# To get null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|             63045|            141025|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [49]:
# Filling the Null values of the the chosen columns
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})

In [50]:
# Verifying the Null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|                 0|                 0|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [52]:
# saving to csv file
df.write.csv("/content/PySpark_tests/preprocessed_data/sample.csv")

In [53]:
# conditionals

In [54]:
df.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                 0|                 0|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                 0|                 0|    1422|
|100

In [63]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [65]:
collect_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
print(collect_rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [66]:
## Actions in PySpark RDDs

# The .count() Action
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())

10


In [67]:
#The .first() Action

first_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
print(first_rdd.first())

1


In [70]:
#The .take() Action

take_rdd = sc.parallelize([1,2,3,4,5])
print(take_rdd.take(4))

[1, 2, 3, 4]


In [71]:
#The .reduce() Action
#The .reduce() Actiontakes two elements from the given RDD and operates. 
#if we want to add all the elements from the given RDD, we can use the .reduce() action.

reduce_rdd = sc.parallelize([1,3,4,6])
print(reduce_rdd.reduce(lambda x, y : x + y))

14


In [72]:
#The .saveAsTextFile() Action

save_rdd = sc.parallelize([1,2,3,4,5,6])
save_rdd.saveAsTextFile('file.txt')

In [73]:
## Transformations in PySpark RDDs

#Transformations are the kind of operations that are performed on an RDD and return a new RDD.

# The .map() Transformation
# The .map() transformation takes in an anonymous function and applies this function to each of the elements in the RDD. For example, If we want to add 10 to each of the elements present in RDD, the .map() transformation would come in handy. This operation saves time and goes with the DRY policy. Let’s understand this with an example:

my_rdd = sc.parallelize([1,2,3,4])
print(my_rdd.map(lambda x: x+ 10).collect())

[11, 12, 13, 14]


In [77]:
# The .filter() Transformation
# Takes up a condotion and filters accordigly.

filter_rdd = sc.parallelize([2, 3, 4, 5, 6, 7])
print("Example 1:",filter_rdd.filter(lambda x: x%2 == 0).collect())

filter_rdd_2 = sc.parallelize(['Rahul', 'Swati', 'Rohan', 'Shreya', 'Priya'])
print("Example 2:",filter_rdd_2.filter(lambda x: x.startswith('R')).collect())

Example 1: [2, 4, 6]
Example 2: ['Rahul', 'Rohan']


In [78]:
# The .union() Transformation

#The .union() transformation combines two RDDs and returns the union of the input two RDDs.

union_inp = sc.parallelize([2,4,5,6,7,8,9])
union_rdd_1 = union_inp.filter(lambda x: x % 2 == 0)
union_rdd_2 = union_inp.filter(lambda x: x % 3 == 0)
print(union_rdd_1.union(union_rdd_2).collect())

[2, 4, 6, 8, 6, 9]


In [82]:
#The .flatMap() Transformation

#The .flatMap() transformation peforms same as the .map() transformation except the fact that
# .flatMap() transformation return seperate values for each element from original RDD.

flatmap_rdd = sc.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())


['Hey', 'there', 'This', 'is', 'PySpark', 'RDD', 'Transformations']

In [83]:
## PySpark Pair RDD Operations

"""PySpark has a dedicated set of operations for Pair RDDs. Pair RDDs are a special kind of data structure in 
PySpark in the form of key-value pairs, and that’s how it got its name. 
Practically, the Pair RDDs are used more widely because of the reason that most of the real-world data 
is in the form of Key/Value pairs. The Pair RDDs use different terminology for key and value. 
The key is known as the identifier while the value is known as data.
"""

marks = [('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]
sc.parallelize(marks).collect()

[('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]

In [84]:
#The .reduceByKey() Transformation

#The .reduceByKey() transformation performs multiple parallel processes for each key in the data and 
#combines the values for the same keys. It uses an anonymous function or lambda to perform the task. 

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

[('Rahul', 48), ('Swati', 45), ('Shreya', 50), ('Abhay', 55), ('Rohan', 44)]


In [86]:
# The .sortByKey() Transformation

#The .sortByKey() transformation sorts the input data by keys from key-value pairs either in ascending or
# descending order. It returns a unique RDD as a result.

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())#

[('Abhay', 29), ('Abhay', 26), ('Rahul', 25), ('Rahul', 23), ('Rohan', 22), ('Rohan', 22), ('Shreya', 22), ('Shreya', 28), ('Swati', 26), ('Swati', 19)]


In [87]:
# The .groupByKey() Transformation

#The .groupByKey() transformation groups all the values in the given data with the same key together. 

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
    print(key, list(value))

Rahul [25, 23]
Swati [26, 19]
Shreya [22, 28]
Abhay [29, 26]
Rohan [22, 22]


In [88]:
#The countByKey() Action
"""
The .countByKey() option is used to count the number of values for each key in the given data. 
This action returns a dictionary and one can extract the keys and values by iterating over the 
extracted dictionary using loops. Since we are getting a dictionary as a result, w
e can also use the dictionary methods such as .keys(), .values() and .items().
"""
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

Rahul 2
Swati 2
Rohan 2
Shreya 1
Abhay 1
