In [3]:
pip install pyspark


Note: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql import SparkSession

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

In [6]:
df_pyspark = spark.read.csv('basic_data.csv')

In [7]:
df_pyspark.show()

+-------+---+----------+
|    _c0|_c1|       _c2|
+-------+---+----------+
|   NAME|AGE|EXPERIENCE|
| Swapil| 27|         1|
|Neelesh| 28|         2|
|  Sukhi| 27|         3|
+-------+---+----------+



In [8]:
df_pyspark = spark.read.option('header','true').csv('basic_data.csv')

In [9]:
df_pyspark.show()

+-------+---+----------+
|   NAME|AGE|EXPERIENCE|
+-------+---+----------+
| Swapil| 27|         1|
|Neelesh| 28|         2|
|  Sukhi| 27|         3|
+-------+---+----------+



In [10]:
df_pyspark.head()

Row(NAME='Swapil', AGE='27', EXPERIENCE='1')

In [11]:
df_pyspark.printSchema()

root
 |-- NAME: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- EXPERIENCE: string (nullable = true)



In [12]:
# READ DATASET 2
df_pyspark = spark.read.option('header','true').csv('basic_data.csv',inferSchema = 'True')

In [13]:
df_pyspark.printSchema()

root
 |-- NAME: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- EXPERIENCE: integer (nullable = true)



In [14]:
df_pyspark = spark.read.csv('basic_data.csv', header=True, inferSchema=True)
df_pyspark.show()

+-------+---+----------+
|   NAME|AGE|EXPERIENCE|
+-------+---+----------+
| Swapil| 27|         1|
|Neelesh| 28|         2|
|  Sukhi| 27|         3|
+-------+---+----------+



In [15]:
df_pyspark.head(3)

[Row(NAME='Swapil', AGE=27, EXPERIENCE=1),
 Row(NAME='Neelesh', AGE=28, EXPERIENCE=2),
 Row(NAME='Sukhi', AGE=27, EXPERIENCE=3)]

In [16]:
# selecting and showing columns
df_pyspark.select(['Name','Experience']).show()

+-------+----------+
|   Name|Experience|
+-------+----------+
| Swapil|         1|
|Neelesh|         2|
|  Sukhi|         3|
+-------+----------+



In [17]:
# data types
df_pyspark.dtypes

[('NAME', 'string'), ('AGE', 'int'), ('EXPERIENCE', 'int')]

In [18]:
df_pyspark.describe().show()

+-------+-------+------------------+----------+
|summary|   NAME|               AGE|EXPERIENCE|
+-------+-------+------------------+----------+
|  count|      3|                 3|         3|
|   mean|   NULL|27.333333333333332|       2.0|
| stddev|   NULL|0.5773502691896258|       1.0|
|    min|Neelesh|                27|         1|
|    max| Swapil|                28|         3|
+-------+-------+------------------+----------+



In [19]:
# adding column in dataframe
df_pyspark2 = df_pyspark.withColumn('Experience after 2 year', df_pyspark['Experience']+2)
df_pyspark2.describe().show()

+-------+-------+------------------+----------+-----------------------+
|summary|   NAME|               AGE|EXPERIENCE|Experience after 2 year|
+-------+-------+------------------+----------+-----------------------+
|  count|      3|                 3|         3|                      3|
|   mean|   NULL|27.333333333333332|       2.0|                    4.0|
| stddev|   NULL|0.5773502691896258|       1.0|                    1.0|
|    min|Neelesh|                27|         1|                      3|
|    max| Swapil|                28|         3|                      5|
+-------+-------+------------------+----------+-----------------------+



In [21]:
# drop column

df_pyspark2 = df_pyspark2.drop('Experience after 2 year')
df_pyspark2.show()

+-------+---+----------+
|   NAME|AGE|EXPERIENCE|
+-------+---+----------+
| Swapil| 27|         1|
|Neelesh| 28|         2|
|  Sukhi| 27|         3|
+-------+---+----------+



In [23]:
# renaname columns 
df_pyspark2.withColumnRenamed('Name','New Name').show()

+--------+---+----------+
|New Name|AGE|EXPERIENCE|
+--------+---+----------+
|  Swapil| 27|         1|
| Neelesh| 28|         2|
|   Sukhi| 27|         3|
+--------+---+----------+



# handling missing values in pyspark

In [125]:
df_pyspark3 = spark.read.csv('basic_data.csv', header=True, inferSchema=True)
df_pyspark3.show()

+-------+----+----------+
|   NAME| AGE|EXPERIENCE|
+-------+----+----------+
| Swapil|  27|       1.0|
|Neelesh|  28|       2.0|
|  Sukhi|  27|       3.0|
| Aditya|  26|      NULL|
|   NULL|  28|       4.0|
|   NULL|  35|       2.0|
|  Rohit|NULL|       4.0|
+-------+----+----------+



In [126]:
# drop the rows 

# checking again 

df_pyspark3.na.drop().show()
# df_pyspark3.show()

+-------+---+----------+
|   NAME|AGE|EXPERIENCE|
+-------+---+----------+
| Swapil| 27|       1.0|
|Neelesh| 28|       2.0|
|  Sukhi| 27|       3.0|
+-------+---+----------+



In [127]:
# full syntax for drop
df_pyspark3.na.drop(how="any", thresh=2).show() 

+-------+----+----------+
|   NAME| AGE|EXPERIENCE|
+-------+----+----------+
| Swapil|  27|       1.0|
|Neelesh|  28|       2.0|
|  Sukhi|  27|       3.0|
| Aditya|  26|      NULL|
|   NULL|  28|       4.0|
|   NULL|  35|       2.0|
|  Rohit|NULL|       4.0|
+-------+----+----------+



In [128]:
# subset
df_pyspark3.na.drop(how='any', subset=['Experience']).show()

+-------+----+----------+
|   NAME| AGE|EXPERIENCE|
+-------+----+----------+
| Swapil|  27|       1.0|
|Neelesh|  28|       2.0|
|  Sukhi|  27|       3.0|
|   NULL|  28|       4.0|
|   NULL|  35|       2.0|
|  Rohit|NULL|       4.0|
+-------+----+----------+



In [129]:
# fill the missing value
df_pyspark3.na.fill('Missing Values',['NAME', 'Age']).show()

+--------------+----+----------+
|          NAME| AGE|EXPERIENCE|
+--------------+----+----------+
|        Swapil|  27|       1.0|
|       Neelesh|  28|       2.0|
|         Sukhi|  27|       3.0|
|        Aditya|  26|      NULL|
|Missing Values|  28|       4.0|
|Missing Values|  35|       2.0|
|         Rohit|NULL|       4.0|
+--------------+----+----------+



In [130]:
df_pyspark3.show()

+-------+----+----------+
|   NAME| AGE|EXPERIENCE|
+-------+----+----------+
| Swapil|  27|       1.0|
|Neelesh|  28|       2.0|
|  Sukhi|  27|       3.0|
| Aditya|  26|      NULL|
|   NULL|  28|       4.0|
|   NULL|  35|       2.0|
|  Rohit|NULL|       4.0|
+-------+----+----------+



In [131]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
        inputCols=['AGE', 'EXPERIENCE'],
        outputCols=["{}_imputed".format(c) for c in ['AGE', 'EXPERIENCE']]
                   ).setStrategy("median")


In [133]:
imputer.fit(df_pyspark3).transform(df_pyspark3).show()

+-------+----+----------+-----------+------------------+
|   NAME| AGE|EXPERIENCE|AGE_imputed|EXPERIENCE_imputed|
+-------+----+----------+-----------+------------------+
| Swapil|  27|       1.0|         27|               1.0|
|Neelesh|  28|       2.0|         28|               2.0|
|  Sukhi|  27|       3.0|         27|               3.0|
| Aditya|  26|      NULL|         26|               2.0|
|   NULL|  28|       4.0|         28|               4.0|
|   NULL|  35|       2.0|         35|               2.0|
|  Rohit|NULL|       4.0|         27|               4.0|
+-------+----+----------+-----------+------------------+

