# Tutorial 1

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=161b09f89605ffdb28783c53ec0b24951752711a9dc4dc4f62b2dd0e5b60616d
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
import pyspark

In [3]:
import pandas as pd
pd.read_csv("test1.csv")

Unnamed: 0,Name,Age
0,JoJo,12
1,Bizarre,15
2,Adventure,20


In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [6]:
spark

In [7]:
df_pyspark = spark.read.csv('test1.csv')

In [8]:
df_pyspark.show()

+---------+---+
|      _c0|_c1|
+---------+---+
|     Name|Age|
|     JoJo| 12|
|  Bizarre| 15|
|Adventure| 20|
+---------+---+



In [9]:
df_pyspark = spark.read.option('header', 'true').csv('test1.csv')
df_pyspark.show()

+---------+---+
|     Name|Age|
+---------+---+
|     JoJo| 12|
|  Bizarre| 15|
|Adventure| 20|
+---------+---+



In [10]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [11]:
df_pyspark.head(3)

[Row(Name='JoJo', Age='12'),
 Row(Name='Bizarre', Age='15'),
 Row(Name='Adventure', Age='20')]

In [12]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



# Tutorial 2
- PySpark DataFrame
- Reading the Dataset
- Checking the Datatypes of the Column (Schema)
- Selecting columns and indexing
- Adding columns
- Dropping columns
- Renaming columns

In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [15]:
spark

In [16]:
## Read the dataset
df_pyspark2 = spark.read.option('header', 'true').csv('test2.csv', inferSchema=True)
df_pyspark2.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|     JoJo| 12|        10|
|  Bizarre| 15|         8|
|Adventure| 20|         4|
+---------+---+----------+



In [17]:
# Check the schema
df_pyspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [18]:
df_pyspark2 = spark.read.csv(
    'test2.csv',
    header=True,
    inferSchema=True)

In [19]:
df_pyspark2

DataFrame[Name: string, Age: int, Experience: int]

In [20]:
df_pyspark2.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|     JoJo| 12|        10|
|  Bizarre| 15|         8|
|Adventure| 20|         4|
+---------+---+----------+



In [21]:
df_pyspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [22]:
type(df_pyspark2)

pyspark.sql.dataframe.DataFrame

In [23]:
df_pyspark2.columns

['Name', 'Age', 'Experience']

In [24]:
df_pyspark2.head(3)

[Row(Name='JoJo', Age=12, Experience=10),
 Row(Name='Bizarre', Age=15, Experience=8),
 Row(Name='Adventure', Age=20, Experience=4)]

In [25]:
df_pyspark2.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|     JoJo| 12|        10|
|  Bizarre| 15|         8|
|Adventure| 20|         4|
+---------+---+----------+



In [26]:
df_pyspark2.select('Name').show()

+---------+
|     Name|
+---------+
|     JoJo|
|  Bizarre|
|Adventure|
+---------+



In [27]:
type(df_pyspark2.select('Name'))

pyspark.sql.dataframe.DataFrame

In [28]:
df_pyspark2.select(['Name', 'Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|     JoJo|        10|
|  Bizarre|         8|
|Adventure|         4|
+---------+----------+



In [29]:
df_pyspark2.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [30]:
df_pyspark2.describe().show()

+-------+---------+------------------+-----------------+
|summary|     Name|               Age|       Experience|
+-------+---------+------------------+-----------------+
|  count|        3|                 3|                3|
|   mean|     null|15.666666666666666|7.333333333333333|
| stddev|     null| 4.041451884327381|3.055050463303893|
|    min|Adventure|                12|                4|
|    max|     JoJo|                20|               10|
+-------+---------+------------------+-----------------+



In [31]:
# Adding columns in PySpark Dataframe

df_pyspark2 = df_pyspark2.withColumn(
    'Experience After 2 year', 
    df_pyspark2['Experience']+2)

In [32]:
df_pyspark2.show()

+---------+---+----------+-----------------------+
|     Name|Age|Experience|Experience After 2 year|
+---------+---+----------+-----------------------+
|     JoJo| 12|        10|                     12|
|  Bizarre| 15|         8|                     10|
|Adventure| 20|         4|                      6|
+---------+---+----------+-----------------------+



In [33]:
# Drop the columns

df_pyspark2 = df_pyspark2.drop('Experience After 2 year')
df_pyspark2.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|     JoJo| 12|        10|
|  Bizarre| 15|         8|
|Adventure| 20|         4|
+---------+---+----------+



In [34]:
# Rename the columns

df_pyspark2.withColumnRenamed('Name', 'New Name').show()

+---------+---+----------+
| New Name|Age|Experience|
+---------+---+----------+
|     JoJo| 12|        10|
|  Bizarre| 15|         8|
|Adventure| 20|         4|
+---------+---+----------+



# Tutorial 3
- Dropping columns
- Dropping rows
- Various parameter in Dropping functionalities
- Handling mising values by Mean, Median, Mode

In [35]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Tutorial3').getOrCreate()

In [36]:
df_pyspark3 = spark.read.csv(
    'test3.csv',
    header=True,
    inferSchema=True)

df_pyspark3.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|     JoJo|  12|        10| 30000|
|  Bizarre|  15|         8| 25000|
|Adventure|  20|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
| Shubharm|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [37]:
# Drop the column

df_pyspark3.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  12|        10| 30000|
|  15|         8| 25000|
|  20|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



In [38]:
df_pyspark3.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|     JoJo|  12|        10| 30000|
|  Bizarre|  15|         8| 25000|
|Adventure|  20|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
| Shubharm|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [39]:
df_pyspark3.na.drop().show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|     JoJo| 12|        10| 30000|
|  Bizarre| 15|         8| 25000|
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [40]:
# any == how

df_pyspark3.na.drop(how='any').show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|     JoJo| 12|        10| 30000|
|  Bizarre| 15|         8| 25000|
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [41]:
# Threshold

df_pyspark3.na.drop(
    how='any',
    thresh=2).show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|     JoJo|  12|        10| 30000|
|  Bizarre|  15|         8| 25000|
|Adventure|  20|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
| Shubharm|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [42]:
# Subset

df_pyspark3.na.drop(
    how='any',
    subset=['Experience']).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|     JoJo| 12|        10| 30000|
|  Bizarre| 15|         8| 25000|
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [43]:
# Filling the missing value

df_pyspark3.na.fill('Kosong','Name').show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|     JoJo|  12|        10| 30000|
|  Bizarre|  15|         8| 25000|
|Adventure|  20|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
| Shubharm|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|   Kosong|  34|        10| 38000|
|   Kosong|  36|      null|  null|
+---------+----+----------+------+



In [44]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age', 'Experience', 'Salary'],
    outputCols = [f"{c}_imputed" for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("mean")

In [45]:
imputer.fit(df_pyspark3).transform(df_pyspark3).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|     JoJo|  12|        10| 30000|         12|                10|         30000|
|  Bizarre|  15|         8| 25000|         15|                 8|         25000|
|Adventure|  20|         4| 20000|         20|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
| Shubharm|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         23|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

# Tutorial 4
- Filter operation
- &,|,==
- ~

In [46]:
from pyspark.sql import SparkSession

In [47]:
spark = SparkSession.builder.appName('Tutorial4').getOrCreate()

In [48]:
df_pyspark4 = spark.read.csv(
    'test4.csv',
    header=True,
    inferSchema=True)

df_pyspark4.show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|     JoJo| 12|        10| 30000|
|  Bizarre| 15|         8| 25000|
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [49]:
# Filter operations

df_pyspark4.filter("Salary<=20000").show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [50]:
df_pyspark4.filter("Salary<=20000").select(['Name', 'Age']).show()

+---------+---+
|     Name|Age|
+---------+---+
|Adventure| 20|
|     Paul| 24|
|   Harsha| 21|
| Shubharm| 23|
+---------+---+



In [51]:
df_pyspark4.filter(df_pyspark4['Salary']<=20000).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [52]:
df_pyspark4.filter(
    (df_pyspark4['Salary']<=20000) &
    (df_pyspark4['Salary']>=15000)).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Adventure| 20|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
| Shubharm| 23|         2| 18000|
+---------+---+----------+------+



In [53]:
df_pyspark4.filter(~(df_pyspark4['Salary']<=20000)).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   JoJo| 12|        10| 30000|
|Bizarre| 15|         8| 25000|
+-------+---+----------+------+



# Tutorial 5

In [54]:
from pyspark.sql import SparkSession  

In [55]:
spark = SparkSession.builder.appName('Agg').getOrCreate()
spark

In [56]:
df_pyspark5 = spark.read.csv(
    'test5.csv',
    header=True,
    inferSchema=True)

df_pyspark5.show()

+---------+------------+------+
|     Name| Departments|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [57]:
df_pyspark5.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [58]:
# GroupBy - Maximum salary
df_pyspark5.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [59]:
df_pyspark5.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(Salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [60]:
df_pyspark5.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [61]:
df_pyspark5.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [62]:
df_pyspark5.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [63]:
df_pyspark5.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



# Tutorial 6
## Examples of PySpark ML

In [64]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Missing').getOrCreate()

In [65]:
## Read the dataset

training = spark.read.csv(
    'test6.csv',
    header=True,
    inferSchema=True)

In [66]:
training.show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sundhashu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [67]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [68]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

In [69]:
from pyspark.ml.feature import VectorAssembler

feature_essembler = VectorAssembler(
    inputCols=['Age', 'Experience'],
    outputCol="Independent Features")

In [70]:
output = feature_essembler.transform(training)

In [71]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|Age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sundhashu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [72]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Features']

In [73]:
finalized_data = output.select(['Independent Features', 'Salary'])

In [74]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [89]:
from pyspark.ml.regression import LinearRegression

## train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(
    featuresCol='Independent Features',
    labelCol='Salary')
regressor = regressor.fit(train_data)

In [93]:
### Coefficients
regressor.coefficients

DenseVector([28.4757, 1271.3568])

In [94]:
regressor.intercept

14299.83249581293

In [95]:
### Prediction
pred_results = regressor.evaluate(test_data)

In [97]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|         [31.0,10.0]| 30000|27896.147403685143|
+--------------------+------+------------------+





In [98]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(2103.852596314857, 4426195.747020763)