In [39]:
import findspark
import pyspark
from pyspark.sql import *
from pyspark import SparkContext, SparkConf
import pathlib 

from pyspark.sql import SparkSession


In [12]:
findspark.init()
## Create Connection with Spark Cluster
sc = pyspark.SparkContext(
    master= 'spark://192.168.0.112:7077' ,
    appName="PySparkCommands")

21/07/21 23:04:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
## Alternate mode to create a Session ##
#spark = SparkSession.builder.appName('PysparkSession').getOrCreate()

In [13]:
## Verifiying Connection
print(sc.master)
print(sc.appName)

## Create SQL COntext
sqlconext = SQLContext(sc)

spark://192.168.0.112:7077
PySparkCommands


In [25]:
FilePath= pathlib.Path().resolve() / 'test1.csv'
print(FilePath)

/home/javier/Documents/DEV/PySpark/test1.csv


In [26]:
df=sqlconext.read.csv(str(FilePath), header=True, sep=',', inferSchema=True)

In [27]:
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [28]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [30]:
## Select multiple Columns
df.select(['Name', 'Age']).show()

+---------+---+
|     Name|Age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
|  Shubham| 23|
+---------+---+



In [32]:
## Data types
df.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [34]:
## Describing Dataframe's Columns
df.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [36]:
## Adding/Creating new Columns
df=df.withColumn('Experience After 3 Years', df['Experience']+3)
df.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience After 3 Years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      13|
|Sudhanshu| 30|         8| 25000|                      11|
|    Sunny| 29|         4| 20000|                       7|
|     Paul| 24|         3| 20000|                       6|
|   Harsha| 21|         1| 15000|                       4|
|  Shubham| 23|         2| 18000|                       5|
+---------+---+----------+------+------------------------+



In [37]:
## Dropping Columns
df=df.drop('Experience After 3 Years')
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [38]:
## Renaming Columns
df.withColumnRenamed('Name', 'Another Name').show()


+------------+---+----------+------+
|Another Name|age|Experience|Salary|
+------------+---+----------+------+
|       Krish| 31|        10| 30000|
|   Sudhanshu| 30|         8| 25000|
|       Sunny| 29|         4| 20000|
|        Paul| 24|         3| 20000|
|      Harsha| 21|         1| 15000|
|     Shubham| 23|         2| 18000|
+------------+---+----------+------+



In [40]:
FilePath2= pathlib.Path().resolve() / 'test2.csv'
print(FilePath2)

/home/javier/Documents/DEV/PySpark/test2.csv


In [41]:
df2=sqlconext.read.csv(str(FilePath2),header=True, inferSchema=True)

In [42]:
df2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [43]:
## Dropping Columns
df2.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



In [44]:
## Dropping rows
# how="any"
df2.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [45]:
## Dropping rows
# how="any" if one of the values for the row is null, it will be dropped
df2.na.drop(how="any").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [46]:
## Dropping rows
# thresh=n if more than n of the values for the row are null, the row will be dropped
df2.na.drop(how="any", thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [47]:
## Dropping rows
## subset=['Column1', 'column2', 'columnN'] rules will be verified over the subset columns
df2.na.drop(how="any", subset=['Age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



In [66]:
## FIlling missing values
# df.na.fill('Value to fill', [List of columns to apply])
df2.na.fill(10, ['age', 'Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 10|        10| 40000|
|     null| 34|        10| 38000|
|     null| 36|        10|  null|
+---------+---+----------+------+



In [67]:
## Original DataFrame
df2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [71]:
## Imputing missing values
import numpy
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','Experience', 'Salary']]
).setStrategy("mean")

In [73]:
imputer.fit(df2).transform(df2).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

In [74]:
sc.stop()